Help with `Io.Group`

I do want to possibly run them concurrently, that’s the point… I’m aware of what io.async and its like do.

Does anyone have some advice on this? I still haven’t resolved this issue of declaring a group of tasks as asynchronous and being able to abort them when a single one fails.

I guess what I want is the Promise.all to the existing Promise.allSettled, in JavaScript terms.

This works:

    const U = union(enum) { ret: error{Oops}!void };
    var sel: Io.Select(U) = .init(io, &.{});
    for (0..5) |i| {
        try sel.concurrent(.ret, foo, .{io, i});
    }
    while (sel.await()) |r| {
        r.ret catch {
            sel.cancelDiscard();
            break;
        };
    } else |_| {}

It won’t work in cases where concurrency is not available…
Also, it will hang once all the tasks are finished if no cancellation happens.

Basically, I want something that can be slotted in here and work with either Io implementation:

const std = @import("std");
const Io = std.Io;
const Threaded = Io.Threaded;

pub fn main(init: std.process.Init) !void {
    // var threaded: Threaded = .init_single_threaded;
    // const io = threaded.io();
    // _ = init;

    const io = init.io;

    const task_count = 10;

    //////////////////////////////
    // Code goes here

    const U = union(enum) { ret: error{Oops}!void };
    var sel: Io.Select(U) = .init(io, &.{});
    for (0..task_count) |i| {
        try sel.concurrent(.ret, do, .{ io, i });
    }
    while (sel.await()) |r| {
        r.ret catch {
            sel.cancelDiscard();
            break;
        };
    } else |_| {}

    //////////////////////////////

    std.log.info("Here", .{});
}

fn do(io: Io, i: usize) error{Oops}!void {
    doFallible(io, i) catch |err| switch (err) {
        error.Canceled => {
            std.log.debug("Canceled task {d}", .{i + 1});
            return;
        },
        else => |e| {
            std.log.err("Failed task {d}: {t}", .{ i + 1, e });
            return e;
        },
    };
    std.log.info("Finished task {d}", .{i + 1});
}

fn doFallible(io: Io, i: usize) (Io.Cancelable || error{Oops})!void {
    // should also work when commenting out the following line
    if (i == 3) return error.Oops;

    try io.sleep(.fromMilliseconds(@intCast(i * 100)), .real);
}

Concurrency is required if using the inbuilt cancellation mechanism.

You can invert the concurrency requirement by having a concurrent task that consumes and cancels the others, but my naive attempt had a deadlock.

I suspect to get around that you would need to do some of your own bookkeeping at which point you may as well implement a custom cancellation mechanism.

Not sure whether this is helpful for your particular situation, but I had a similar problem and came up with something like this:

fn doStuffInLoop(doing_stuff_failed: *std.atomic.Value(bool)) void {
    const iterations = 1000;
    for (0..iterations) |i| {
        if (doing_stuff_failed.load(.monotonic)) return;

        ... // do the stuff

        if (failure_condition) {
            std.log.err("error at loop iteration {}\n", .{ i });
            // I think .monotonic is OK here if this is the only call to `store`
            doing_stuff_failed.store(true, .monotonic);
            return;
        }
    }
}

pub fn main(init: std.process.Init) !void {
    const io = init.io;

    var group: std.Io.Group = .init;
    defer group.cancel(io);

    var doing_stuff_failed: std.atomic.Value(bool) = .init(false);
    const task_count = 10;
    for (0..task_count) |i| group.async(io, doStuffInLoop, .{ &doing_stuff_failed });
    try group.await(io);

    if (doing_stuff_failed.raw) return error.DoStuffInLoopFailure;
}
2 Likes

That is basically what I had in mind as a custom cancellation mechanism.

You probably don’t want to check the bool every iteration for best performance, but depends, benchmark if it matters.

In my case the difference between the loop time with and without it is negligible.

I thought there would have been something in the standard library for this use case, but this was the best I could come up with

There is, its std.atomic.Value(bool) :3

there is no perfect solution that fits all use cases, if there were future/group.cancel would use it

I considered calling Group.cancel from within a group member, but it’s not thread-safe so doing that is probably bad. I wonder if there could be a way to make calling it from within a member safe

Tasks/a group task, can’t cancel themselves because cancelling also blocks until completion which includes the completion of the task that is blocking until completion, it’s a deadlock.