How to handle timeouts in the new Io?

I have been toying around with the new Io.
Here is my test code:

const std = @import("std");

fn testingConnection(io: std.Io) !void {
    try io.sleep(.fromSeconds(1), .awake); // only simulates work, is not necessary for correctnes since address already listens

    const address: std.Io.net.IpAddress = .{ .ip4 = .loopback(8008) };
    const stream = try address.connect(io, .{ .mode = .stream, .protocol = .tcp });
    defer stream.close(io);

    // no buffering setup, no need to call flush
    var stream_writer = stream.writer(io, &.{});
    const writer = &stream_writer.interface;

    try writer.print("Apocalypse message from TCP client!\n", .{});

    try io.sleep(.fromSeconds(1), .awake); // again only to simulate work
}

pub fn main() !void {
    var gpa: std.heap.DebugAllocator(.{}) = .init;
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();

    var threaded: std.Io.Threaded = .init(allocator, .{ .concurrent_limit = .unlimited });
    defer threaded.deinit();
    const io = threaded.io();

    // no buffering setup, no need to call flush
    var stdout_writer: std.Io.File.Writer = .initStreaming(.stdout(), io, &.{});
    const stdout = &stdout_writer.interface;

    try stdout.print("Hello world!\n", .{});

    const address: std.Io.net.IpAddress = .{ .ip4 = .loopback(8008) };
    var server = try address.listen(io, .{});
    defer server.deinit(io);

    var connection_fut = try io.concurrent(testingConnection, .{io});
    defer connection_fut.cancel(io) catch {};

    // if testingConnection() fails this will wait forever, prob should setup timeout loop?
    const stream = try server.accept(io);
    defer stream.close(io);

    var stream_buffer: [1024]u8 = undefined;
    var stream_reader = stream.reader(io, &stream_buffer);
    const reader = &stream_reader.interface;

    _ = try reader.streamDelimiter(stdout, '\n'); // unfortunate there is not inclusive version
    _ = try reader.stream(stdout, .limited(1)); // also cleans '\n' from buffer for future use

    try connection_fut.await(io); // waits for the second sleep()
    try stdout.print("Goodbye world!\n", .{});
}

It works well, but i have few questions:

  • Is there a better way to add timeout to accept() then just Select it as async task together with concurrent task for timeout?
  • Is this the reccomended way to do this? I am looking to understand the usage patrerns of the interface.
  • Is there a better way to do the Inclusive stream? I see there are Inclusive/Exclusive variants for take and peek, why not stream? (I know i can just implement it, but since take and peek have them, what is the reasoning behind stream)
  • I see there is testing io, how far along is it, what can it catch and how to get its behavior when declaring it myself?

Overall i really like the interface so far, I hope signal processing will make it inside aswell. It just makes sense to be inside imo.

Robert :blush:

When dealing with futures the select, or similar, pattern is all you get.

However, the lower level Operation and Batch API’s do offer built in support for timeouts.

Those APIs are limited right now. But will be expanded to support every operation, which will intern give every operation timeout support.
But that is still a lower level API used by abstractions like a Stream, which should (when it moves to this API) provide a way to set timeouts.

The testing io is just a Threaded currently. But an implementation for testing an leak checking is planned.

I don’t see why signal processing would go into Io? You could get the data through Io but processing it should be separate IMO.

1 Like