How to keep a reference to reader/writer interfaces in a structure?

I’m playing around a toy TCP server. From the very simple example which only handle one connection at a time, I’ve added a Client structure and a handler function which is launched with async, so multiple clients can connect.

Fields in Client are very simple:

const Client = struct {
    id: usize,
    stream: std.Io.net.Stream,
};

The problem is, every time I need to read or write from the net.Stream I need to reinit the Reader/Writer with a buffer, and take a reference to the interface. For instance, warning the client that the connection will close is done through a deinit() function like this:

  fn deinit(self: Client, io: std.Io) void {
      var writer = self.stream.writer(io, &.{});
      const w = &writer.interface;
      w.print("Server stopped\n", .{}) catch unreachable;
      self.stream.close(io);
  }

I’ve tried to add fields like this (with the respective init function), but I get Sigfault on read or write operations.

const Client = struct {
    id: usize,
    stream: std.Io.net.Stream,
    stream_reader: std.Io.net.Stream.Reader,
    stream_writer: std.Io.net.Stream.Writer,
    reader: *std.Io.Reader,
    writer: *std.Io.Reader,
};

Here is my current code that works well, but I’m not happy about how it is designed. Reader/Writer are not part of the struct, so I can’t really pass it around (to a game loop/app for instance)

const std = @import("std");
const log = std.log;

const Client = struct {
    id: usize,
    stream: std.Io.net.Stream,

    fn init(id: usize, stream: std.Io.net.Stream) Client {
        return .{
            .id = id,
            .stream = stream,
        };
    }

    fn deinit(self: Client, io: std.Io) void {
        std.log.debug("Deinit client {}", .{self.id});
        var writer = self.stream.writer(io, &.{});
        const w = &writer.interface;
        w.print("Server stopped\n", .{}) catch unreachable;
        self.stream.close(io);
    }
};

fn handler(io: std.Io, client: Client) void {
    var buf: [1024]u8 = undefined;
    var reader = client.stream.reader(io, &buf);
    const r = &reader.interface;

    var writer = client.stream.writer(io, &.{});
    const w = &writer.interface;

    while (true) {
        const text = r.takeDelimiterInclusive('\n') catch {
            std.log.debug("Client {} disconnected", .{client.id});
            break;
        };
        std.debug.print("Received from {}: {s}", .{ client.id, text });
        w.printAscii(text, .{}) catch unreachable;
    }
    client.deinit(io);
}

pub fn main(init: std.process.Init) !void {
    const io = init.io;
    var tasks: std.Io.Group = .init;

    const address: std.Io.net.IpAddress = try .parse("127.0.0.1", 7777);
    var server = try address.listen(io, .{});
    std.log.debug("Server listening:", .{});

    var i: usize = 1;
    while (true) : (i += 1) {
        const stream = try server.accept(io);

        std.log.debug("Client connected: {}", .{stream.socket.address});
        std.log.debug("Id: {}", .{i});

        const client: Client = .init(i, stream);

        tasks.async(io, handler, .{ io, client });
    }
    tasks.cancel(io);
}

How should I proceed to handle many concurrent connections and have a complete easy-to-use Client structure ?

On a side note, working with std.Io is amazing ! Breaking the main server loop calls cancel, which gracefully breaks the client loops, and eventually calls client.deinit(). It’s much simpler than synchronizing raw threads. Impressive work on Io !

Store either the pointers to the interface (recommended) or only the implementations. Storing the stream itself is also probably not necessary.

The former is recommended simply because it makes your code a lot more flexible, and easier to test.

You probably got segfaults because Client.reader/writer points to a field in Client.stream_reader/writer. This means you can’t copy the Client struct, because the new reader and writer fields will still point to the old struct.

This can be solved in multiple ways. You can create the reader/writer implementations in handler and only store the pointers to the interface in Client like vulpesx recommended. This allows you copy the Client struct.

You can also avoid copying the Client struct by passing it as pointer. Client.init is the most problematic because you have to make it fill in a struct instead of returning a copy. I modified your code to this:

const std = @import("std");
const log = std.log;

const Client = struct {
    id: usize,
    stream: std.Io.net.Stream,
    stream_reader: std.Io.net.Stream.Reader,
    stream_writer: std.Io.net.Stream.Writer,
    reader: *std.Io.Reader,
    writer: *std.Io.Writer,
    read_buf: [1024]u8,

    fn init(self: *Client, io: std.Io, id: usize, stream: std.Io.net.Stream) void {
        self.id = id;
        self.stream = stream;
        self.stream_reader = self.stream.reader(io, &self.read_buf);
        self.reader = &self.stream_reader.interface;
        self.stream_writer = self.stream.writer(io, &.{});
        self.writer = &self.stream_writer.interface;
    }

    fn deinit(self: *const Client, io: std.Io) void {
        std.log.debug("Deinit client {}", .{self.id});
        self.writer.print("Server stopped\n", .{}) catch {};
        self.stream.close(io);
    }
};

fn handler(io: std.Io, id: usize, stream: std.Io.net.Stream) void {
    var client: Client = undefined;
    client.init(io, id, stream);
    
    while (true) {
        const text = client.reader.takeDelimiterInclusive('\n') catch {
            std.log.debug("Client {} disconnected", .{client.id});
            break;
        };
        std.debug.print("Received from {}: {s}", .{ client.id, text });
        client.writer.printAscii(text, .{}) catch {};
    }
    client.deinit(io);
}

pub fn main(init: std.process.Init) !void {
    const io = init.io;
    var tasks: std.Io.Group = .init;
    defer tasks.cancel(io);

    const address: std.Io.net.IpAddress = try .parse("127.0.0.1", 7777);
    var server = try address.listen(io, .{});
    std.log.debug("Server listening:", .{});

    var i: usize = 1;
    while (true) : (i += 1) {
        const stream = try server.accept(io);

        std.log.debug("Client connected: {}", .{stream.socket.address});
        std.log.debug("Id: {}", .{i});

        tasks.async(io, handler, .{ io, i, stream });
    }
}
2 Likes

Thank you very much both of you !

@TerenceTux Your solution makes so much sense, and really helped me to understand where I was mistaking ! Thank you again. What I don’t like is how cumbersome the Client struct is. I didn’t like my (failed) attempt and still don’t like this (successful) attempt.

@vulpesx Ok, I’ll try keep only the interface pointers. And indeed, handler can close the stream itself, it doesn’t have to be done inside Client.

:heart:

1 Like

Here is the working code with a clean Client struct. Thanks again !

I’m breaking the server loop after 3 connections just to be sure that warnClient is called. And it is, meaning cancellation of tasks doesn’t destroy them, it gracefully waits for them to exit. It is really cool, but it seems like magic to me. How does it work ? async adds a Cancelable error to the handler function ?
takeDelimiterInclusive outputs ReadFailed when a cancel is requested, not Canceled.

const std = @import("std");
const log = std.log;

const Client = struct {
    id: usize,
    reader: *std.Io.Reader,
    writer: *std.Io.Writer,

    fn warnClient(self: Client) void {
        self.writer.print("Server will shutdown\n", .{}) catch {};
    }
};

fn handler(io: std.Io, id: usize, stream: std.Io.net.Stream) void {
    var readbuf: [1024]u8 = undefined;
    var stream_reader = stream.reader(io, &readbuf);
    var stream_writer = stream.writer(io, &.{});
    const reader = &stream_reader.interface;
    const writer = &stream_writer.interface;

    var client = Client{ .id = id, .reader = reader, .writer = writer };

    while (true) {
        const text = client.reader.takeDelimiterInclusive('\n') catch {
            std.log.debug("Client {} disconnected", .{client.id});
            break;
        };
        std.debug.print("Received from {}: {s}", .{ client.id, text });
        client.writer.printAscii(text, .{}) catch {};
    }
    client.warnClient();
    stream.close(io);
}

pub fn main(init: std.process.Init) !void {
    const io = init.io;
    var tasks: std.Io.Group = .init;
    defer tasks.cancel(io);

    const address: std.Io.net.IpAddress = try .parse("127.0.0.1", 7777);
    var server = try address.listen(io, .{});
    std.log.debug("Server listening:", .{});

    var i: usize = 1;
    while (true) : (i += 1) {
        if (i == 4) break;
        const stream = try server.accept(io);

        std.log.debug("Client connected: {}", .{stream.socket.address});
        std.log.debug("Id: {}", .{i});

        tasks.async(io, handler, .{ io, i, stream });
    }
}

Nope! Cancelling a task just informs the Io implementation that the task is cancelled (basically it sets a flag), and the implementation simply checks on most operations (like reading from a stream) if the task was cancelled, returning the error if so. You can also manually check for cancellation with io.checkCancel() but you should avoid it. There is also io.recancel() if you want to continue until the next cancellation check.

It is up to the task what to do when encountering the cancellation error, usually you should propagate it, so the caller is aware, but you don’t have to!

If you cancel a group, its await will return error.canceled regardless of if it is propagated. This is only because it is unaware of individual task returns, normal futures will not do this.

the reader/writer interfaces have no knowledge of Io or async! Those are details only relevent to the implementation and the code that created it. ReadFailed indicates an implementation specific error, which could be canceled, the implementation should provide a way to get a more specific error; for your specific case that is just an err field both reader and writer implementations have.

3 Likes

Thank you very much @vulpesx ! As always your replies are really helpful :heart:

If you cancel a group, its await will return error.canceled regardless of if it is propagated. This is only because it is unaware of individual task returns, normal futures will not do this.

Yes, this I knew. It also explains why functions called in a group can’t error (but can if they are just future)

for your specific case that is just an err field both reader and writer implementations have.

Oh great ! I though it was impossible to discriminate between a simple error or a cancellation request. Thanks !

They are allowed to error, but only with error.Canceled.

1 Like