How can I use an Io.Group to start up a bunch of tasks but cancel them if one of them fails?
I’ve written the following code snippet which is what I intuitively came up with, but it doesn’t work…
const std = @import("std");
const Random = std.Random;
const Pcg = Random.Pcg;
const Io = std.Io;
const Dir = Io.Dir;
const Group = Io.Group;
const Mutex = Io.Mutex;
const Writer = Io.Writer;
pub fn main(init: std.process.Init) !void {
const io = init.io;
const arena = init.arena.allocator();
var args = try init.minimal.args.iterateAllocator(arena);
_ = args.next().?;
const seed = if (args.next()) |seed| try std.fmt.parseInt(u64, seed, 0) else blk: {
var seed: u64 = undefined;
io.random(std.mem.asBytes(&seed));
break :blk seed;
};
std.log.debug("Seed: {d}", .{seed});
var pcg: Pcg = .init(seed);
var r = pcg.random();
// try Dir.cwd().createDir(io, "jobs", .default_dir);
const task_count = 10;
var group: Group = .init;
defer group.cancel(io);
var mutex: Mutex = .init;
for (0..task_count) |i| {
group.async(io, do, .{ &group, &mutex, io, r.int(u64), i });
}
try group.await(io);
}
fn do(group: *Group, mutex: *Mutex, io: Io, seed: u64, i: usize) Io.Cancelable!void {
doFallible(io, seed, i) catch |err| switch (err) {
error.Canceled => |e| return e,
else => |e| {
if (mutex.tryLock()) {
std.log.err("Failed task {d}: {t}", .{ i + 1, e });
group.cancel(io);
}
},
};
}
fn doFallible(io: Io, seed: u64, i: usize) !void {
std.log.debug("Starting task {d}", .{i + 1});
var pcg: Pcg = .init(seed);
var r = pcg.random();
try io.sleep(.fromMilliseconds(r.uintAtMost(u32, 1000)), .real);
var job_filename_buf: [10]u8 = undefined;
const job_filename = try std.fmt.bufPrint(&job_filename_buf, "jobs/{d}", .{i + 1});
const file = try Dir.cwd().createFile(io, job_filename, .{});
defer file.close(io);
var writer_buf: [1024]u8 = undefined;
var file_writer = file.writer(io, &writer_buf);
const w = &file_writer.interface;
writeTaskInfo(w, &r) catch return file_writer.err.?;
}
fn writeTaskInfo(w: *Writer, r: *Random) Writer.Error!void {
try w.print("Information from task: {d}\n", .{r.int(u32)});
try w.flush();
}
When running, for example, zig run main.zig -- 13396650619255211433, I get the following output:
debug: Seed: 13396650619255211433
debug: Starting task 3
debug: Starting task 2
debug: Starting task 1
debug: Starting task 4
debug: Starting task 6
debug: Starting task 7
debug: Starting task 5
debug: Starting task 8
debug: Starting task 9
debug: Starting task 10
error: Failed task 6: FileNotFound
thread 8611509 panic: reached unreachable code
/Users/ejshafran/.local/lib/zig/std/debug.zig:423:14: 0x10095805f in assert (main)
if (!ok) unreachable; // assertion failure
^
/Users/ejshafran/.local/lib/zig/std/Io/Threaded.zig:2302:11: 0x100a33507 in groupAwait (main)
assert(!pre_await_status.have_awaiter);
^
/Users/ejshafran/.local/lib/zig/std/Io.zig:1299:33: 0x100a555cf in await (main)
try io.vtable.groupAwait(io.userdata, g, token);
^
/Users/ejshafran/scratch/zig-group-async/main.zig:39:20: 0x100a5178b in main (main)
try group.await(io);
^
/Users/ejshafran/.local/lib/zig/std/start.zig:750:30: 0x100a51d6b in callMain (main)
return wrapMain(root.main(.{
^
???:?:?: 0x18344ab97 in start (/usr/lib/dyld)
Abort trap: 6 zig run main.zig -- 13396650619255211433
Am I doing something wrong…? Is this not how groups are supposed to be used? If so - is there something in the Io interface that can acheive what I want?