Take the following example to illustrate my problem:
const std = @import("std");
const Io = std.Io;
pub fn main(i: std.process.Init) !void {
var r_buf: [4096]u8 = undefined;
var w_buf: [4096]u8 = undefined;
var r = Io.File.stdin().readerStreaming(i.io, &r_buf);
var w = Io.File.stdout().writerStreaming(i.io, &w_buf);
var pump = i.io.async(Io.Reader.streamRemaining, .{ &r.interface, &w.interface });
// ...
const n = pump.cancel(i.io) catch |e| switch (e) {
error.ReadFailed => return r.err.?,
error.WriteFailed => return w.err.?,
};
w.interface.flush() catch return w.err.?;
std.log.info("{}", .{n});
}
The output is “error: Canceled” from returning either reader.err or writer.err after ReadFailed/WriteFailed is returned due to the cancellation request.
I instead need this to be propogated [a] from the function [b]. I don’t want to make the function accept File.Reader/Writer to increase reusability [c].
[a] necessary in my case as I am using a Group (the function indicates ReadFailed by other means).
[b] streamRemaining is placeholder function; my actual function has error.Canceled in the error set
[c] for unit testing and making the core code a standalone library
1 Like
How come it needs to be propagated from the task function, rather than in the function that constructs the reader and writer?
error.Canceled in specific needs to be propagated from the task function in order to not swallow it (i.e. as is asserted with Group.cancel). Other underlying errors should still propagate error.ReadFailed / error.WriteFailed to the function that constructs them.
Ah yes just as you replied I came up with this example that demonstrates it:
const std = @import("std");
const Io = std.Io;
const assert = std.debug.assert;
pub fn main(init: std.process.Init) !void {
const io = init.io;
var stdout_writer = Io.File.stdout().writerStreaming(io, &.{});
var group: Io.Group = .init;
var result: Io.Writer.Error!void = undefined;
try group.concurrent(io, task, .{ &stdout_writer.interface, &result });
group.cancel(io);
}
fn task(w: *Io.Writer, result: *Io.Writer.Error!void) Io.Cancelable!void {
result.* = streamer(w);
}
fn streamer(w: *Io.Writer) Io.Writer.Error!void {
try w.writeAll("hello world");
}
$ zig run test.zig
thread 2974857 panic: reached unreachable code
/home/andy/src/zig/lib/std/debug.zig:421:14: 0x1032669 in assert (std.zig)
if (!ok) unreachable; // assertion failure
^
/home/andy/src/zig/lib/std/Io/Threaded.zig:491:23: 0x1092226 in start (std.zig)
assert(!cancel_acknowledged); // group task acknowledged cancelation but did not return `error.Canceled`
^
/home/andy/src/zig/lib/std/Io/Threaded.zig:1756:29: 0x10eb08a in worker (std.zig)
runnable.startFn(runnable, &thread, t);
^
/home/andy/src/zig/lib/std/Thread.zig:423:13: 0x10c2ec5 in callFn__anon_15635 (std.zig)
@call(.auto, f, args);
^
/home/andy/src/zig/lib/std/Thread.zig:1375:30: 0x1091020 in entryFn (std.zig)
return callFn(f, self.fn_args);
^
/home/andy/src/zig/lib/std/os/linux/x86_64.zig:105:5: 0x10c2fa5 in clone (std.zig)
asm volatile (
^
fish: Job 1, 'stage3/bin/zig run test.zig' terminated by signal SIGABRT (Abort)
It’s impossible to square the Io.Group assertion with the Io.Writer API.