How to async/concurrent execute of subsets of tasks consecutively

Hi,

just tried your approach. First of all, its working much better. However, the result seems to be inconsistent. Sometimes it really processes batches of 3, sometimes its 1-5-3-1 (I reduced the sleep time to 500ms):

debug: Completed part 500ms after start
debug: Completed part 1000ms after start
debug: Completed part 1000ms after start
debug: Completed part 1000ms after start
debug: Completed part 1001ms after start
debug: Completed part 1001ms after start
debug: Completed part 1501ms after start
debug: Completed part 1501ms after start
debug: Completed part 1501ms after start
debug: Completed part 2001ms after start

In general, thats no big deal since it works good enough. But I would like to understand why it sometimes processes more than the max permit of 3 at the same time.

Furthermore, many thanks for the elaborated explanation. But there is one thing I don’t understand regarding Group (and thats what kept me from using it so far; maybe due to my not so good English skills): If I would use a Group with a Semaphore, should I spawn all tasks into one group, or generate a single group for every batch of 3 (or whatever will be the max permit count)?

Update:

After trying some things, I was able to solve the irregular async execution using a Group and a Semaphore:

// ** omitted code see above ** //

    const max_concurrency_count = 3;
    var semaphore: Io.Semaphore = .{ .permits = max_concurrency_count };

    var futures: std.Io.Group = .init;
    errdefer futures.cancel(io);

    for (ranges.items, 0..) |r, i| {
        std.log.debug("Putting range {d} to futures", .{i});
        const chunk = content[r.start .. r.end + 1];
        futures.async(io, downloadAndWrite, .{ io, &semaphore, &local_file, chunk, r.start, start_time });
    }

    try futures.await(io);

    const now = Io.Timestamp.untilNow(start_time, io, .awake);
    std.log.debug("Time elapsed: {d}ms", .{now.toMilliseconds()});

// ** //

It works like a charm. Perfect execution in blocks of 3 every time:

debug: Part downloaded after 500ms
debug: Part downloaded after 500ms
debug: Part downloaded after 500ms
debug: Part downloaded after 1001ms
debug: Part downloaded after 1001ms
debug: Part downloaded after 1001ms
debug: Part downloaded after 1501ms
debug: Part downloaded after 1501ms
debug: Part downloaded after 1501ms
debug: Part downloaded after 2002ms
debug: Time elapsed: 2002ms

The only downside is that I need to catch all errors from my downloadAndWrite function to match Io.Cancelable.Canceled:

fn downloadAndWrite(io: Io, sema: *Io.Semaphore, file: *std.Io.File, chunk: []const u8, start: u64, start_time: Io.Timestamp) !void {
    sema.wait(io) catch return Io.Cancelable.Canceled;
    defer sema.post(io);
    io.sleep(std.Io.Duration.fromMilliseconds(500), .awake) catch return Io.Cancelable.Canceled;
    file.writePositionalAll(io, chunk, start) catch return Io.Cancelable.Canceled;
    const now = Io.Timestamp.untilNow(start_time, io, .awake);
    std.log.debug("Part downloaded after {d}ms", .{now.toMilliseconds()});
}

Otherwise the compilation will always fail with:

/home/lukeflo/.cache/zig/p/N-V-__8AAC_uTRUrhIpzwcTOMDh5tBuMQQ3cDzGRmhAegCJd/lib/std/Io.zig:1042:24: error: expected type 'error{Canceled}!void', found '@typeInfo(@typeInfo(@TypeOf(main.downloadAndWrite)).@"fn".return_type.?).error_union.error_set!void'
                return @call(.auto, function, args_casted.*);

As I understand from this thread its necessary to catch all errors to use a Group like this. That, of course, is not very helpful for error handling since I would need to somehow collect the “real” errors if a download fails and store them in a different way. Or are there other possibilities to coerce an error returned from the async executed function to Io.Cancelable.Canceled while also keeping the original error?

2 Likes