Help with `Io.Group`

How can I use an Io.Group to start up a bunch of tasks but cancel them if one of them fails?
I’ve written the following code snippet which is what I intuitively came up with, but it doesn’t work…

const std = @import("std");
const Random = std.Random;
const Pcg = Random.Pcg;
const Io = std.Io;
const Dir = Io.Dir;
const Group = Io.Group;
const Mutex = Io.Mutex;
const Writer = Io.Writer;

pub fn main(init: std.process.Init) !void {
    const io = init.io;
    const arena = init.arena.allocator();

    var args = try init.minimal.args.iterateAllocator(arena);

    _ = args.next().?;

    const seed = if (args.next()) |seed| try std.fmt.parseInt(u64, seed, 0) else blk: {
        var seed: u64 = undefined;
        io.random(std.mem.asBytes(&seed));
        break :blk seed;
    };

    std.log.debug("Seed: {d}", .{seed});

    var pcg: Pcg = .init(seed);
    var r = pcg.random();

    // try Dir.cwd().createDir(io, "jobs", .default_dir);

    const task_count = 10;

    var group: Group = .init;
    defer group.cancel(io);
    var mutex: Mutex = .init;
    for (0..task_count) |i| {
        group.async(io, do, .{ &group, &mutex, io, r.int(u64), i });
    }
    try group.await(io);
}

fn do(group: *Group, mutex: *Mutex, io: Io, seed: u64, i: usize) Io.Cancelable!void {
    doFallible(io, seed, i) catch |err| switch (err) {
        error.Canceled => |e| return e,
        else => |e| {
            if (mutex.tryLock()) {
                std.log.err("Failed task {d}: {t}", .{ i + 1, e });
                group.cancel(io);
            }
        },
    };
}

fn doFallible(io: Io, seed: u64, i: usize) !void {
    std.log.debug("Starting task {d}", .{i + 1});

    var pcg: Pcg = .init(seed);
    var r = pcg.random();

    try io.sleep(.fromMilliseconds(r.uintAtMost(u32, 1000)), .real);

    var job_filename_buf: [10]u8 = undefined;
    const job_filename = try std.fmt.bufPrint(&job_filename_buf, "jobs/{d}", .{i + 1});

    const file = try Dir.cwd().createFile(io, job_filename, .{});
    defer file.close(io);

    var writer_buf: [1024]u8 = undefined;
    var file_writer = file.writer(io, &writer_buf);
    const w = &file_writer.interface;

    writeTaskInfo(w, &r) catch return file_writer.err.?;
}

fn writeTaskInfo(w: *Writer, r: *Random) Writer.Error!void {
    try w.print("Information from task: {d}\n", .{r.int(u32)});
    try w.flush();
}

When running, for example, zig run main.zig -- 13396650619255211433, I get the following output:

debug: Seed: 13396650619255211433
debug: Starting task 3
debug: Starting task 2
debug: Starting task 1
debug: Starting task 4
debug: Starting task 6
debug: Starting task 7
debug: Starting task 5
debug: Starting task 8
debug: Starting task 9
debug: Starting task 10
error: Failed task 6: FileNotFound
thread 8611509 panic: reached unreachable code
/Users/ejshafran/.local/lib/zig/std/debug.zig:423:14: 0x10095805f in assert (main)
    if (!ok) unreachable; // assertion failure
             ^
/Users/ejshafran/.local/lib/zig/std/Io/Threaded.zig:2302:11: 0x100a33507 in groupAwait (main)
    assert(!pre_await_status.have_awaiter);
          ^
/Users/ejshafran/.local/lib/zig/std/Io.zig:1299:33: 0x100a555cf in await (main)
        try io.vtable.groupAwait(io.userdata, g, token);
                                ^
/Users/ejshafran/scratch/zig-group-async/main.zig:39:20: 0x100a5178b in main (main)
    try group.await(io);
                   ^
/Users/ejshafran/.local/lib/zig/std/start.zig:750:30: 0x100a51d6b in callMain (main)
    return wrapMain(root.main(.{
                             ^
???:?:?: 0x18344ab97 in start (/usr/lib/dyld)
Abort trap: 6              zig run main.zig -- 13396650619255211433

Am I doing something wrong…? Is this not how groups are supposed to be used? If so - is there something in the Io interface that can acheive what I want?

You can’t await/cancel Io.Group from multiple tasks at the same time. Each call to cancel blocks until the group finishes. And there can be only one active waiter. You already have one that calls await.

What you probably want is Io.Select, iterate over the results, cancel the whole select if your condition is met.

3 Likes

Hmmm, I assumed something like that was the case. But shouldn’t the Mutex in the example take care of that? It should make it so only one task cancels the group, no?

The problem is that you are actively waiting on the group using await(), so you can’t call cancel() until the await finishes (which would be pointless).

By using Io.Select, you delegate the cancelation to the parent task, where it belongs.

I tried using Io.Select but I guess I misunderstand something there as well.

Doing the following:

diff --git a/original.zig b/main.zig
index 4bc6b1a..6add94d 100644
--- a/original.zig
+++ b/main.zig
@@ -3,8 +3,7 @@ const Random = std.Random;
 const Pcg = Random.Pcg;
 const Io = std.Io;
 const Dir = Io.Dir;
-const Group = Io.Group;
-const Mutex = Io.Mutex;
+const Select = Io.Select;
 const Writer = Io.Writer;
 
 pub fn main(init: std.process.Init) !void {
@@ -30,23 +29,31 @@ pub fn main(init: std.process.Init) !void {
 
     const task_count = 10;
 
-    var group: Group = .init;
-    defer group.cancel(io);
-    var mutex: Mutex = .init;
+    const U = union(enum) {
+        job: anyerror!void,
+    };
+    var select_buf: [task_count]U = undefined;
+    var select: Select(U) = .init(io, &select_buf);
+    defer _ = select.cancel();
+
     for (0..task_count) |i| {
-        group.async(io, do, .{ &group, &mutex, io, r.int(u64), i });
+        select.async(.job, do, .{ io, r.int(u64), i });
+    }
+
+    while (true) {
+        const result = try select.await();
+        std.log.info("Some task finished", .{});
+
+        result.job catch break; // cancel via defer above
     }
-    try group.await(io);
 }
 
-fn do(group: *Group, mutex: *Mutex, io: Io, seed: u64, i: usize) Io.Cancelable!void {
+fn do(io: Io, seed: u64, i: usize) !void {
     doFallible(io, seed, i) catch |err| switch (err) {
         error.Canceled => |e| return e,
         else => |e| {
-            if (mutex.tryLock()) {
-                std.log.err("Failed task {d}: {t}", .{ i + 1, e });
-                group.cancel(io);
-            }
+            std.log.err("Failed task {d}: {t}", .{ i + 1, e });
+            return e;
         },
     };
 }

Yields this result:

debug: Seed: 13396650619255211433
debug: Starting task 1
debug: Starting task 3
debug: Starting task 2
debug: Starting task 5
debug: Starting task 4
debug: Starting task 7
debug: Starting task 8
debug: Starting task 6
debug: Starting task 9
debug: Starting task 10
error: Failed task 6: FileNotFound
error: Failed task 5: FileNotFound
error: Failed task 1: FileNotFound
error: Failed task 3: FileNotFound
error: Failed task 2: FileNotFound
error: Failed task 7: FileNotFound
error: Failed task 8: FileNotFound
error: Failed task 4: FileNotFound
error: Failed task 10: FileNotFound
info: Some task finished

Which is very confusing to me…

If select.await returns an error it only means it’s reading from its internal job queue was cancelled, it does not reflect the errors from your task, meaning you are not cancelling on an error!

To get the error union from the job you have to (try select.await()).job

Also, you really should not use anyerror, avoid it at all costs!

2 Likes

You might have already consulted @vulpesx‘s doc here, and the select section, in particular, but, just in case, or for another who finds this thread, I thought I’d point to it.

2 Likes

To get the error union from the job you have to (try select.await()).job

Yeah, but that’s what I’m doing… In the (diffed) code snippet above:

const result = try select.await();
// ...

result.job catch break;

As for anyerror - yeah, I avoid it in my actual code. Just used it here for a simple code snippet.

What’s still weird for me is that select.await sometimes blocks until several tasks are completed - I’m not entirely sure why. The behavior is actually fairly OK and consistent in my actual code which doesn’t io.sleep, so maybe it has to do with weirdness related to that?

Reduce the buffer that you use for select to zero, and then it will have blocking behavior, waiting for just one task. With non-zero buffer, tasks can end nearly at the same time and post their results before the await returns.

If the buffer is not large enough for the remaining jobs then select.cancel will deadlock!

Are you sure about that? I’ll have to check the code, but cancel should drain the queue as it goes, no?

It is mentioned in the docs

[..] If the select was initialized with insufficient buffer space for all remaining tasks to finish, a deadlock occurs. [..]

It happens because it first cancels its group, which waits for all pending tasks to finish, any tasks that miss out of buffer space will block endlessly until space/a consumer is available, however the consuming task is busy waiting for the group so there will never be free space/a consumer.

1 Like

Oh, there is cancelDiscard, which seems to do what I expected cancel to do.

Right, I forgot to mention that. However, it is not appropriate if there are returned resources to clean up, which may be the case for OP since the code shared was an example/reproduceable.

They are returning void, and just need fail fast semantics, so I think it’s the right thing to use. The original code was using group, which has no way of returning non-void.

Hmm. Using a zero-length buffer and cancelDiscard doesn’t seem to work either:

diff --git a/select.zig b/select2.zig
index 88654bc..32a8a3a 100644
--- a/select.zig
+++ b/select2.zig
@@ -32,9 +32,8 @@ pub fn main(init: std.process.Init) !void {
     const U = union(enum) {
         job: anyerror!void,
     };
-    var select_buf: [task_count]U = undefined;
-    var select: Select(U) = .init(io, &select_buf);
-    defer _ = select.cancel();
+    var select: Select(U) = .init(io, &.{});
+    defer select.cancelDiscard();
 
     for (0..task_count) |i| {
         select.async(.job, do, .{ io, r.int(u64), i });

Does:

debug: Seed: 13396650619255211433
debug: Starting task 1
debug: Starting task 2
debug: Starting task 3
debug: Starting task 4
debug: Starting task 5
debug: Starting task 7
debug: Starting task 8
debug: Starting task 6
debug: Starting task 9
debug: Starting task 10
error: Failed task 6: FileNotFound
error: Failed task 5: FileNotFound
error: Failed task 1: FileNotFound
error: Failed task 3: FileNotFound
error: Failed task 2: FileNotFound
error: Failed task 7: FileNotFound
error: Failed task 8: FileNotFound
error: Failed task 4: FileNotFound
error: Failed task 10: FileNotFound
error: Failed task 9: FileNotFound

And then just hangs (which I’m guessing is a deadlock…).

But also, generally, I’m interested - is Io.Select what should be used for a “fail fast” async semantic?


EDIT: adding more detailed pondering about the way I’m thinking of using the Io interface. I can move this to a different topic or something if needed.

For example, say I’m writing some library (though this applies for application code as well) that wants to use the Io interface so it’s reusable across different IO models.

I want something that looks roughly like:

const std = @import("std");
const Io = std.Io;
const Dir = Io.Dir;

const Result = struct {
    // ...
};

fn handleDirectoryEntry(io: Io, dir: Dir, entry: Dir.Entry) !Result {
    // ...

    _ = io;
    _ = dir;
    _ = entry;

    @panic("TODO");
}

pub fn iterateDirectory(io: Io, dir_path: []const u8) !std.ArrayList(Result) {
    const dir = try Dir.cwd().openDir(io, dir_path, .{ .iterate = true });
    defer dir.close(io);

    var it = dir.iterate();
    while (try it.next(io)) |entry| {
        _ = entry;
    }

    @panic("TODO");
}

What should be done here, to properly gather these results?

If handling one of the entries fails, I want to fail the main iterateDirectory function itself, and I would like to not do duplicate work in those cases.

Writing it using the async code from the Io interface would look like:

var group: Io.Group = .init;
// Or `Io.Select`, or something like that...
while (try it.next(io)) |entry| {
  group.async(io, handleDirectoryEntry, .{ io, dir, entry });
}

// "Fail fast" and cancel on the first failure
// (Which was what this topic was essentially asking how to do,
// to begin with...)

But if the IO model starts these tasks eagerly when calling group.async (or whatever) for some reason (if there’s no threads available in Io.Threaded, or if the IO implementation is blocking on purpose), then this code would actually force me to wait until all the entries are fully handled before failing… When basically, in those cases, I would want something like:

while (try it.next(io)) |entry| {
  // Fail the function and stop handling other entries!
  try handleDirectoryEntry(io, dir, entry);
}

Why don’t you use this then? Directly calling the function is just fine.

Using io.async and io.concurrent is for when you want to run things in the background. If you do that, you need to wait on the results.

I’ll need to play a bit with Io.Select on how to get the fail fast semantics, but it seems to me that your problem really comes down to the fact that you don’t need to use io.async or anything like that in order to use the Io interface

1 Like

Ah, but I want the code to be IO-agnostic! These different function calls are asynchronous, in that they can run concurrently, but they don’t have to. However, I want the code to behave optimally in both such cases…

Directly calling the function already is IO-agnostic, in fact it’s the most IO-agnostic option. You should only use io.async if you want to possibly run them concurrently, for example processing multiple dirs at the same time would benefit you. But if you just go one at a time and you are fine with that, it will work with any Io backend.

1 Like