Hi,
just tried your approach. First of all, its working much better. However, the result seems to be inconsistent. Sometimes it really processes batches of 3, sometimes its 1-5-3-1 (I reduced the sleep time to 500ms):
debug: Completed part 500ms after start
debug: Completed part 1000ms after start
debug: Completed part 1000ms after start
debug: Completed part 1000ms after start
debug: Completed part 1001ms after start
debug: Completed part 1001ms after start
debug: Completed part 1501ms after start
debug: Completed part 1501ms after start
debug: Completed part 1501ms after start
debug: Completed part 2001ms after start
In general, thats no big deal since it works good enough. But I would like to understand why it sometimes processes more than the max permit of 3 at the same time.
Furthermore, many thanks for the elaborated explanation. But there is one thing I don’t understand regarding Group (and thats what kept me from using it so far; maybe due to my not so good English skills): If I would use a Group with a Semaphore, should I spawn all tasks into one group, or generate a single group for every batch of 3 (or whatever will be the max permit count)?
Update:
After trying some things, I was able to solve the irregular async execution using a Group and a Semaphore:
// ** omitted code see above ** //
const max_concurrency_count = 3;
var semaphore: Io.Semaphore = .{ .permits = max_concurrency_count };
var futures: std.Io.Group = .init;
errdefer futures.cancel(io);
for (ranges.items, 0..) |r, i| {
std.log.debug("Putting range {d} to futures", .{i});
const chunk = content[r.start .. r.end + 1];
futures.async(io, downloadAndWrite, .{ io, &semaphore, &local_file, chunk, r.start, start_time });
}
try futures.await(io);
const now = Io.Timestamp.untilNow(start_time, io, .awake);
std.log.debug("Time elapsed: {d}ms", .{now.toMilliseconds()});
// ** //
It works like a charm. Perfect execution in blocks of 3 every time:
debug: Part downloaded after 500ms
debug: Part downloaded after 500ms
debug: Part downloaded after 500ms
debug: Part downloaded after 1001ms
debug: Part downloaded after 1001ms
debug: Part downloaded after 1001ms
debug: Part downloaded after 1501ms
debug: Part downloaded after 1501ms
debug: Part downloaded after 1501ms
debug: Part downloaded after 2002ms
debug: Time elapsed: 2002ms
The only downside is that I need to catch all errors from my downloadAndWrite function to match Io.Cancelable.Canceled:
fn downloadAndWrite(io: Io, sema: *Io.Semaphore, file: *std.Io.File, chunk: []const u8, start: u64, start_time: Io.Timestamp) !void {
sema.wait(io) catch return Io.Cancelable.Canceled;
defer sema.post(io);
io.sleep(std.Io.Duration.fromMilliseconds(500), .awake) catch return Io.Cancelable.Canceled;
file.writePositionalAll(io, chunk, start) catch return Io.Cancelable.Canceled;
const now = Io.Timestamp.untilNow(start_time, io, .awake);
std.log.debug("Part downloaded after {d}ms", .{now.toMilliseconds()});
}
Otherwise the compilation will always fail with:
/home/lukeflo/.cache/zig/p/N-V-__8AAC_uTRUrhIpzwcTOMDh5tBuMQQ3cDzGRmhAegCJd/lib/std/Io.zig:1042:24: error: expected type 'error{Canceled}!void', found '@typeInfo(@typeInfo(@TypeOf(main.downloadAndWrite)).@"fn".return_type.?).error_union.error_set!void'
return @call(.auto, function, args_casted.*);
As I understand from this thread its necessary to catch all errors to use a Group like this. That, of course, is not very helpful for error handling since I would need to somehow collect the “real” errors if a download fails and store them in a different way. Or are there other possibilities to coerce an error returned from the async executed function to Io.Cancelable.Canceled while also keeping the original error?