Writing to stdout from async function

Hi,

I’m trying to understand how to write to stdout correctly from async functions when using std.Io.Threaded, for the use case of writing logs from the connection handler function in a tcp server.

I have the following minimum code to reproduce the issue, which is an error.WriteFailed that only happens from the connection handler, and only when it is called with io.async.

I bet something is wrong with the way i use std.Io.Mutex, or the way i store my handle to stdout and my writer, or the way i init the threaded io, but I’ve been struggling to find examples up to date with the new I/O interface and i don’t know much about multithreading so i can’t figure out the fix.

Can someone let me know what’s wrong with the code below ?

const std = @import("std");
const Io = std.Io;
const net = Io.net;
const fmt = std.fmt;

const Logger = struct {
    stdout: std.Io.File,
    stdout_buffer: [4096]u8,
    stdout_mutex: std.Io.Mutex,
    writer: std.Io.File.Writer,

    pub fn init(l: *Logger, io: std.Io) void {
        l.stdout = std.Io.File.stdout();
        l.stdout_buffer = undefined;
        l.stdout_mutex = std.Io.Mutex.init;
        l.writer = std.Io.File.Writer.init(l.stdout, io, &l.stdout_buffer);
    }

    pub fn log(l: *Logger, io: std.Io, comptime format: []const u8, args: anytype) !void {
        try l.stdout_mutex.lock(io);
        defer l.stdout_mutex.unlock(io);

        var wr = &l.writer.interface;
        try wr.print(format, args);
        try wr.flush();
    }
};

fn handleConn(io: Io, logger: *Logger, conn: net.Stream) void {
    logger.log(io, "handling connection in handleConn\n", .{}) catch |err| {
        std.debug.print("ERROR: {}\n", .{err});
    };
    conn.close(io);
}

pub fn main() !void {
    var debug_allocator: std.heap.DebugAllocator(.{
        .thread_safe = true,
    }) = .init;
    const process_allocator = debug_allocator.allocator();

    var threaded: Io.Threaded = .init(process_allocator, .{});
    var io = threaded.io();

    var logger: Logger = undefined;
    logger.init(io);

    try logger.log(io, "START SERVICE\n", .{});

    const loopback: net.IpAddress = .{ .ip4 = net.Ip4Address.loopback(8888) };
    var server = loopback.listen(io, .{ .reuse_address = true }) catch |err| {
        std.debug.print("error : {}\n", .{err});
        return err;
    };
    defer server.deinit(io);
    while (true) {
        const conn = server.accept(io) catch {
            continue;
        };
        // The following works as expected:
        //
        //  handleConn(io, &logger, conn);

        // The following does not work. It shows "ERROR: error.WriteFailed" in
        // the terminal when i send a request to the listening server
        var conn_future = io.async(handleConn, .{ io, &logger, conn });
        defer conn_future.cancel(io);
    }
}

I think you have the wrong shape here, as you’re immediately canceling in the accept-loop.

You probably want an Io.Group before the loop which you defer cancel. Then inside the loop, you call async on the group instead of on io directly.

2 Likes

I ran this in a debugger and stepped through the handle connection function. One thing that helped me is that the Writer has an err field that is set to the actual error. Changing the handler like so:

  fn handleConn(io: Io, logger: *Logger, conn: net.Stream) void {
      logger.log(io, "handling connection in handleConn\n", .{}) catch |err| {
-          std.debug.print("ERROR: {}\n", .{err});
+          std.debug.print("ERROR: {} -> w.err == {?}\n", .{ err, logger.writer.err });
      };
      conn.close(io);
  }

Results in:

ERROR: error.WriteFailed -> w.err == error.Canceled

So you are getting a cancelled error when you are flushing.

Like @cryptocode alluded to, your problem is that you are cancelling in the loop. So on the end of the current iteration, you are cancelling the handler, which gives you the write failure.

It is better to let it run, and have the cancel occur outside the loop.

4 Likes

You need group.concurrent, because group.async is not guaranteed to run until you call group.await, which means never for this kind of loop.

2 Likes

Good point, that’s indeed what Group says in the doc-comments, but when you assume Threaded, do you get a stronger guarantee? If not, it seems like stuff like std doc would be technically wrong too (init’s io is Threaded, at least for now): https://codeberg.org/ziglang/zig/src/branch/master/lib/compiler/std-docs.zig#L82-L88

1 Like

Thanks to you both, i was completely misunderstanding what i was doing with the call to defer conn_future.cancel(io), and the error does disappear when i remove that line.
However, i’m kind of confused about io.Group: based 0.16 release notes i thought io.Group should be used when the tasks i want to spawn share the same lifetime, and i don’t understand how that’s the case for a tcp server, where new connections can be received long after previous ones have been completely handled. Am i misunderstanding the release notes ?

Based on 0.16 release notes and the blog post from Loris Cro i thought io.concurrent should only be used for tasks where the result is incorrect if not done concurrently.

A tcp server should be able to run and handle requests sequentially. It’s totally inefficient but the result is still correct. I’m confused about why i need to “force” concurrency here.

1 Like

Technically, in all current implementations, using io.async is fine, but Andrew specifically updated the docs after another thread here, to make sure that the Io spec for group does not guarantee to run async tasks until you run await, so you should only use async for short lived groups where you wait for all the tasks.

3 Likes

Good to know, thanks!

Btw, this is not a good approach, you should always propagate error.Canceled. You can chose to continue on other errors, but not on error.Canceled, otherwise you are making your task uncancelable.

1 Like

I think it’s easier to see how tasks in an accept loop share lifetime, if instead of an infinite loop you have a stop flag (maybe set by a signal handler or other means), allowing for graceful shutdown.

Yes, i don’t understand how cancelations work at all. If i receive an error.Canceled at the server.accept call, doesn’t that mean the client cancelled the request before the task is started anyway, so the correct behavior is to continue without starting the task so server.accept gets called again right away to wait for another connection ?

No, if you receive error.Canceled from accept(), it means that your function (the one calling accept(), has been canceled. If you are doing this inside of main, it can never happen, but it’s a good practice to always propagate error.Canceled, because you never know how you endu up using the code.

If the client closed the connection somehow, you will receive some I/O related error from the subsequent read/write.

The error error.Canceled is kind of special, it only happens after you call something.cancel(), as you should treat it as signal to clean up and return.

1 Like

Thanks for the explanation,

Since in this case i am doing it from main, what does propagate it mean ? If the error can never happen in main, shouldn’t i do something like :

if (err == error.Canceled) unreachable

In a hypothetical context where i’m not doing this from main, should i propagate the error up by returning it up the call stack, or down by calling .cancel on async tasks created from the current function ?

I’d personally still do:

const conn = server.accept() catch |err| switch (err) {
    error.Canceled => return error.Canceled,
    else => continue,
}

I mostly mentioned it, because people find ziggit posts and might copy the code, even though their accept loop is a task on its own and should be cancelable.

5 Likes