Let’s say I want to take a function that did a few synchronous IO actions and add concurrency.
fn multicastSync(io: Io, data: []const u8) !void {
try writeToSocket(io, data);
try writeToFileA(io, data);
try writeToFileB(io, data);
}
// What I want to convert to
fn multicastAsync(io: Io, data: []const u8) !void {
var group: Io.Group = .init;
defer group.cancel();
group.async(io, writeToSocket, .{ io, data });
group.async(io, writeToFileA, .{ io, data });
group.async(io, writeToFileB, .{ io, data });
try group.await();
}
This doesn’t work because the function you pass to Io.Group.await must coerce to Cancelable!void, and my functions can return errors other than error.Canceled. I’ve thought of two ways to work around this restriction, but neither feel particularly satisfying:
Create a wrapper function that catches all errors from the “real” function and returns only error.Canceled.
Use Io.Select instead.
Option 1 adds a lot of noise wrt the wrapper functions, and anyways I’m not sure if it’s correct to convert other errors to error.Canceled arbitrarily. Option 2 doesn’t work because Io.Select seems designed to help you handle the return values/completion order of each call, but I’m trying to use Io.Group because I don’t care about that here. So it still seems a bit hacky.
Is there a better option here? I feel like something is missing from my mental model of Group (or maybe async in general) because I would expect this to be a somewhat common situation.
Use catch to handle other errors first of writeToSocket, writeToFileA, and writeToFileB. You can use it also just convert errors to error.Canceled.
Those are the ideas from the top of my head.
If you do this, you’re going to hit an assertion that ensures proper error.Canceled propagation. The rule is:
error.Canceled must be returned from the group task return value if, and only if, the group task gets canceled.
Tips for proper cancelation propagation:
when using catch, be sure to switch on the error and check for and propagate error.Canceled
don’t propagate error.ReadFailed or error.WriteFailed past the point where the reader/writer is constructed. Instead, extract the underlying error out of the reader/writer implementation, which may include error.Canceled.
Makes sense, thanks for the explanation. My takeaway is that a function passed to Io.Group.async should propagate, but not create, error.Canceled, and suppress all other errors.
Is there a reason you’re reaching for io.Group here? io.Group is good for running a lot of functions, but it’s design doesn’t lend to getting results from those functions. There’s times where that’s fine, but this doesn’t seem like one of them?
Here’s the same function without groups:
fn multicastAsync(io: Io, data: []const u8) !void {
var socket_future = io.async(writeToSocket, .{ io, data });
// Assuming we don't care about errors (including but not limited to canceled)
// for errors beyond the first one.
errdefer socket_future.cancel(io) catch {};
var a_future = io.async(writeToFileA, .{ io, data });
errdefer a_future.cancel(io) catch {};
var b_future = io.async(writeToFileB, .{ io, data });
errdefer b_future.cancel(io) catch {};
try socket_future.await(io);
try a_future.await(io);
try b_future.await(io);
}
If you have an unknown numbers of items you’re multiplexing to: You’ll either need to wrap your functions you’re adding to the group to pass the error elsewhere (eg, a Queue), or create a place for futures to go (eg, ArrayList) so that they have a place to put their result that you can similarly check for errors and call cancel on.
Sorry to nitpick, but I would generally recommend using unconditional defer on those cancels:
defer socket_future.cancel(io) catch {};
var a_future = io.async(writeToFileA, .{ io, data });
defer a_future.cancel(io) catch {};
var b_future = io.async(writeToFileB, .{ io, data });
defer b_future.cancel(io) catch {};
It’s easier to reason about this, because this way you’re also allowed to early return successfully from the function before calling await. All the cancel-y functions are intentionally designed to be idempotent with respect to themselves and await, specifically to support this pattern.
Your main point stands.
This is basically Io.Select. With the awaitMany function added a couple days ago:
fn multicastAsync(io: Io, data: []const u8) !void {
const U = union(enum) {
write_socket: @typeInfo(@TypeOf(writeToSocket)).@"fn".return_type.?,
write_file: @typeInfo(@TypeOf(writeToFile)).@"fn".return_type.?,
};
var queue_buffer: [3]U = undefined; // Allows using async below instead of concurrent
var select: Io.Select(U) = .init(io, &queue_buffer);
defer select.cancel();
select.async(.write_socket, writeToSocket, .{ io, data });
select.async(.write_file, writeToFile, .{ io, data, a });
select.async(.write_file, writeToFile, .{ io, data, b });
var results_buffer: [3]U = undefined;
const results = results_buffer[0..try select.awaitMany(&results_buffer, 3))];
assert(results.len == 3);
for (results) |result_union| switch (result_union) {
.write_socket => |result| try result,
.write_file => |result| try result,
};
}
You’re probably right that Group is not really what I want here, but imo it’s less about getting the results from the functions and more that I’m converting a small (known) number of sync writes to async writes.
In the real code my example is based on, I’m reading two files, parsing their contents, then using the parsed data to write some new files. Each read->parse->write pass is independent of each other, so they seemed like easy candidates for async. I reached for Group because I wanted the caller to wait for both passes to complete, but I don’t care about the order or the results (this is a fire-and-forget script).
For my use case, I think using plain io.async and future.await makes way more sense. I still think it’s interesting to consider how one can adapt a function that was written “naively”, i.e. returning arbitrary errors, to work with Group.