Ourio: (yet another) asynchronous task library

I got sidetracked while building an IRC server to build an HTTP server, and building an HTTP server really turned into learning all about io_uring. Ourio is the result of that learning. It’s an asynchronous task runner, very similar to libxev but with a few key differences:

  1. Unified callback signature (I know, technically you can do this with libxev too)
  2. Pass one additional piece of metadata with a task: a single u16 msg
  3. The runtime handles memory management of the Tasks
  4. Included Mock implementation for easy unit testing of application code.

I originally started my server project(s) with libxev, but wanted to utilize the msg_ring capability of io_uring to be able to nicely pass messages between threads. I had a pr to add this to libxev, but ultimately I was unhappy with the API and closed it.

Overall, I probably could have gotten this to work with libxev in a decent way. But I also just wanted to really learn what io_uring was all about. This library sets some flags for some smallish performance gains, and heavily relies on each thread having it’s own io_uring instance. Spawning a new Ring from an existing one sets flags on the io_uring instance to share the worker threadpool that the kernel sets up.

Two extra goodies: I reimplemented std.net.tcpConnectTo{Addr|Host} and, using tls.zig I implemented an asynchronous TLS client handshake. Both of these are in the libary.

Here’s a test from the library to show a bit of how it works:

test "runtime: msgRing" {
    const gpa = std.testing.allocator;
    var rt1 = try io.Ring.init(gpa, 16);
    defer rt1.deinit();

    var rt2 = try rt1.initChild(16);
    defer rt2.deinit();

    const Foo2 = struct {
        rt1: bool = false,
        rt2: bool = false,

        const Msg = enum { rt1, rt2 };

        fn callback(_: *io.Ring, task: io.Task) anyerror!void {
            const self = task.userdataCast(@This());
            const msg = task.msgToEnum(Msg);
            switch (msg) {
                .rt1 => self.rt1 = true,
                .rt2 => self.rt2 = true,
            }
        }
    };

    var foo: Foo2 = .{};

    // The task we will send from rt1 to rt2
    const target_task = try rt1.getTask();
    target_task.* = .{
        .userdata = &foo,
        .callback = Foo2.callback,
        .msg = @intFromEnum(Foo2.Msg.rt2),
        .result = .{ .usermsg = 0 },
    };

    _ = try rt1.msgRing(
        &rt2,
        target_task,
        .{ .cb = Foo2.callback, .msg = @intFromEnum(Foo2.Msg.rt1), .ptr = &foo },
    );

    try rt1.run(.until_done);
    try std.testing.expect(foo.rt1);
    try std.testing.expect(!foo.rt2);
    try rt2.run(.until_done);
    try std.testing.expect(foo.rt1);
    try std.testing.expect(foo.rt2);
}
18 Likes

Really promising!

I have to take a swing at replacing asio with ourio in thespian. Looks like a nice match.

Added an async DNS resolver today. The goal is to build up to an entirely async http client. The DNS resolver makes that fully possible with the library now, but I have zero http parsing so that would have to be on your own :smile:

The resolver uses /etc/resolv.conf and handles the retries and timeout options there (if set, and has it’s own defaults which are the max mentioned in man resolv.conf). It’ll even cycle through all the servers listed until it finds a result.

    const Foo = struct {
        fn myCallback(_: *Ring, task: Task) anyerror!void {
            const result = task.result.?;

            const bytes = try result.userbytes;
            const resp: Response = .{ .bytes = bytes };
            var iter = try resp.answerIterator();
            while (iter.next()) |answer| {
                switch (answer) {
                    .A => |ip4| _ = ip4,    // [4]u8,
                    .AAAA => |ip6| _ = ip6, // [16]u8
                }
            }
        }
    };

    try resolver.resolveQueries(&io, &.{
        .{ .host = "timculverhouse.com" },
        .{ .host = "timculverhouse.com", .type = .AAAA },
    }, .{ .cb = Foo.myCallback });
    try io.run(.until_done);
    try std.testing.expectEqual(2, resolver.config.nameservers.len);
    try std.testing.expectEqual(3, resolver.config.attempts);
    try std.testing.expectEqual(10, resolver.config.timeout_s);

1 Like