std.Io overview

I feel like this is too long! what details do you think would be better in a different post? or do you think the focus of this one should change?
perhaps this should not go into any details and just link to other posts for e.g async/concurrency?

With 0.16 just released with the std.Io interface. It is bound to change (this will be updated with it) in the future as more people use it and find where it is lacking, many areas are already known. It is only in a “good enough to be used for most things and get feedback stage”.

where did my apis go?

std.fs api’s have moved under std.Io.File and std.Io.Dir.
std.posix api’s have largely been removed infavor of std.Io.
the lower level, platform specific apis are still available.
there is also std.c if you really need it.
std.Io.Timestamp, std.Io.Clock and std.Io.Duration are a big improvement over the previous apis
most I/O api’s will now take an std.Io parameter.
some are methods on a std.Io instance, where it will be passed implicitly

The rest of this post will focus on async/concurrency

Other topics

what is it?

std.Io is an interface that abstracts all I/O and other blocking operations. Previous high/mid-level APIs have been removed in favour of std.Io.

It also acts as a runtime for managing asynchronous and concurrent tasks in a manner that solves the issues with function colouring.

how do I use it?

Like this:

pub fn main(init: std.process.Init) !void {
    // provided by juicy main:
    const io = init.io;
    // create your own, single threaded
    var threaded: std.Io.Threaded = .init_single_threaded;
    const io = threaded.io();
    // if you need concurrency
    var threaded: std.Io.Threaded = .init(gpa, .{});
    const io = threaded.io();
    // look at the options, the docs are quite detailed

    // open a file
    const file = try Io.Dir.cwd().openFile(io, "foo.txt", .{});
    // defer close it
    defer file.close(io);
    // write to it
    try file.writetreamingAll(io, "asdf");
    // there are more advanced and useful apis for this available
}

The std.Io implementation should, like the main allocator, be chosen by the application in the main function. 0.16 introduced juicy main[1] where the std library can provide pre-selected Io and Allocator instances among other things.

Currently, the only useable implementation is std.Io.Threaded.
The release notes state it is feature complete[2], however, they seem to only be referring to the async runtime aspect.

Most operations are implemented on most platforms, but there are holes, as well as plain limitations of the Io API[3]

Other implementations include failing (useful for testing) and Evented which is one of: Uring (Linux), Kqueue (BSD) or Dispatch (macOS).
The Evented implementations should be considered proof-of-concept for the time being.

While std.Io.Threaded does just use OS threads, other implementations may use different units of concurrency.
as a result, thread specific code (e.g threadlocal) should be avoided to ensure your code is compatible with other implementations in the future.

Async basics

Terms:

  • parallelism: truly doing multiple things at the same time
    like 2 people doing different tasks
  • concurrency: progressing multiple tasks, not necessarily at the same exact time.
    Like 1 person doing multiple tasks, but could be 2 people if available
  • asynchrony/asynchronicity: the potential, but not requirement, to progress multiple tasks
    like 1 person doing 1 task, but if possible more tasks as well. Or maybe a 2nd person does it.

As you can see, they are looser and looser terms, that encompass the previous terms.

std.Io only offers asynchrony and concurrency, and the latter is optional.

asynchrony is most useful when you have multiple tasks, or parts of tasks, that do not depend on each other, at least not until both parts are done.

An example

fn save(io: Io) !void {
    var future_foo = io.async(saveData, .{ io, "foo.txt", "foo\n" });
    var future_bar = io.async(saveData, .{ io, "bar.txt", "bar\n" });

    const foo = try future_foo.await(io);
    defer foo.close(io);
    const bar = try future_bar.await(io);
    defer bar.close(io);
}

fn saveData(io: Io, file: []const u8, data: []const u8) !Io.File {
    const fd = try Io.Dir.cwd().createFile(io, file, .{});
    errdefer fd.close(io);
    try fd.writeStreamingAll(io, data);
    return fd;
}

Saving the individual files don’t depend on each other, so they can benefit from potentially being done concurrently, or in one word, asynchronously.

But the overall saving does require both to be finished before it returns, hence why it await’s each task.

However, there is an issue with the code, what if saving foo returns an error! Then bar would never be awaited, and we would never close its file!

The file is the obvious leak, but the futures[4] are also resources! This is because if the task was run concurrently the Io implementation needs to hold onto the task data (function and parameters) until it can run it, then it must hold onto the result until it can give back to the caller[5]. Which can only happen when you call await or cancel on the future.

Luckily the solution is rather simple:

fn save(io: Io) !void {
    var future_foo = io.async(saveData, .{ io, "foo.txt", "foo\n" });
    defer if (future_foo.cancel(io)) |file| file.close(io) else |_| {};
    var future_bar = io.async(saveData, .{ io, "bar.txt", "bar\n" });
    defer if (future_bar.cancel(io)) |file| file.close(io) else |_| {};

    _ = try future_foo.await(io);
    _ = try future_bar.await(io);
}

To understand the solution, you need to understand await and cancel

  • await: waits for the task to complete, returning the result when done
  • cancel: signals the task to cancel, then waits for completion, returning the result.
  • both: if the task is already complete, returns the result
  • both: return the exact same type as the functions return type
  • both: cancellation is detected via a error.Canceled
    functions should usually propagate that error as it is important for the caller

Put that together, and you can defer cancel to cleanup both the future and the file our task returns, removing the other defer close we had.
And it works regardless of if they are awaited or canceled else where.

Concurrency

sometimes you have a stronger need for concurrency that asynchrony can’t reliably satisfy.

io.concurrent guarantees concurrency, if it is not possible then it returns error.ConcurrencyUnavailable. Io implementations are not required to support concurrency, it is the developer’s responsibility to choose an implementation that meets their needs.
The implementation given by juicy main[1:1] will support concurrency if possible.

A contrived example:
you have a bunch of high performance tasks that can’t wait for logging, even async has too much overhead. So instead you want to hand off the information to log to another task, so the high performance tasks can just request and forget.

This requires that between the logger task and the compute tasks there be concurrency. Your first attempt might look like this:

pub fn main(init: std.process.Init) !void {
    const io = init.io;

    var buf: [10]Task = undefined;
    var queue: Io.Queue(Task) = .init(&buf);
    var future_logger = try io.concurrent(logger, .{ io, &queue });
    defer future_logger.cancel(io) catch {};

    var future_a = io.async(do, .{ io, .a, &queue });
    defer future_a.cancel(io) catch {};

    var future_b = io.async(do, .{ io, .b, &queue });
    defer future_b.cancel(io) catch {};

    var future_c = io.async(do, .{ io, .c, &queue });
    defer future_c.cancel(io) catch {};

    // we do want to wait for them to finish
    try future_a.await(io);
    try future_b.await(io);
    try future_c.await(io);
}

const Task = struct {
    id: Id,
    status: enum { started, pending, finished },

    const Id = enum(u8) { a, b, c };
};

fn logger(io: Io, queue: *Io.Queue(Task)) !void {
    while (queue.getOne(io)) |t| {
        std.log.info("{t} {t}", .{ t.status, t.id });
    } else |e| return e;
}

fn do(io: Io, task: Task.Id, queue: *Io.Queue(Task)) !void {
    try queue.putOne(io, .{ .id = task, .status = .started });
    try io.sleep(.fromSeconds(@intFromEnum(task) * 2), .real);
    try queue.putOne(io, .{ .id = task, .status = .pending });
    try io.sleep(.fromSeconds(@intFromEnum(task) * 2), .real);
    try queue.putOne(io, .{ .id = task, .status = .finished });
}

Io.Queue has its own section

it probably works, but it has a problem: if the logger is lagging behind such that when the compute tasks finish there are still unprocessed logs.
you can test this with a io.sleep in the logger loop
At which point the function will exit, cancelling the logger due to the defer, and not processing the remaining logs!

awaiting the logger will just deadlock, because it doesn’t know their will be no more logs.

We want to signal that there will be no more logs, but still process remaining ones.
Fortunately this is the behaviour for closing the queue!

queue.close(io);
try future_logger.awai(io); // without this the cancel will still happen

An easy mistake is using concurrency on the compute tasks, or both compute and logger tasks! It will still work, but it will require more concurrency units than actually needed, potentially exceeding what the Io can provide, resulting in an avoidable error.ConcurrencyUnavailable

Useful things

Queue

std.Io.Queue is a multi-producer, multi-consumer FIFO queue
it supports putting and getting single and multiple items
it operates over a backing buffer, blocking when it is full/empty respectively.
how it handles blocking is quite smart, taking that oportunity to run other tasks waiting on the queue.

Group

Efficiently handles an arbitrary amount of futures.
able to await and cancel them all together.

but, you can only act on them all together, not individually.
And it limits your return types to void, error{Canceled} or error{Canceled}!void
try simplifying the previous example with this

Select

This is just a combination of a std.Io.Group, std.Io.Queue and some meta programming.
Supports arbitrary return types, but you have to declare them upfront as a tagged union.
But that is a good thing, as it allows you to differentiate, or group tasks even when the have the same return type.

lets you await for the next completed task, or multiple of them.
there is a convenient cancelDiscard to cancel and ignore any results
but if they return resources to clean up you must use cancel!
cancel does require that there is enough buffer space for all remaining tasks,
a limitation of the group waiting for all tasks to finish.
cancelDiscard does not have that limitation.

Sync primatives

std.Io has a variety of synchronisation primitives.
these should be prefered over OS sync primatives as they integrate with the interfaces task management. OS sync primitives may also just not work how you intend with some Io implementations.



  1. you can now request some goodies with a std.process.Init/Init.Minimal parameter to your main function, release notes ↩︎ ↩︎

  2. Io release notes ↩︎

  3. non IP networking is just not supported at all. ↩︎

  4. the handle for asynchronous/concurrent tasks, i.e. the return values of io.async and io.concurrent ↩︎

  5. it can’t put it directly into the future because they may be moved around, returned, and put in collections. Futures don’t have a stable address, so the Io must make a stable address itself. ↩︎

12 Likes

No, my vote: it’s great like this. Don’t shorten. Don’t break up. imo.

2 Likes