0.16 Io - managing dependency-based Io scheduling

Hello all, fairly new to zig but have been playing about with sockets to get to know the new 0.16 Io API. The little project I’m working on is based on ZMQ, where I want to see if I can abstract a lot of the high performance networking / threading tricks behind the Io API. The dream here would be that I can just describe the protocol and how the sockets should work and tasks should run, and the choice of Io API lets the user make decisions about performance tradeoffs.

There are a few specific use cases I’m finding hard to express in the current API, and I wondered if people might have any thoughts on how to express these.

Expressing task dependencies

A common case I’ve been running into is I want to express something like “do A, then do B after A, then do C after B, then at some point later, wait for all of this…”. Say for example if I’m queueing sends on a socket, I want to ensure that send B happens after send A, rather than calling both with io.async as they may run out-of-order.

Solutions for this I can think of are

  1. Create an Io.Queue and a worker process that runs in io.concurrent. This is fine if the tasks you want to do are all pretty homogenous (so if I’m just sending different bodies this works), but does add overhead of then having to manage a long-lived concurrent job outside of this. If A, B and C are quite different, the worker can end up quite complex. Also, this never works in single threaded mode.
  2. If you always want A, then B, then C, then just write that as large function doAthenBthenC and .async this. This is fine if you know you will always do all these tasks, but it could be that the choice of tasks is conditional on runtime inputs.
  3. Write wrapper functions that are like doBafterX and take a reference to a future to await before running. This creates a bit of crud where I need lots of small wrapper functions depending on if I think a function might be depended on, and starts to feel like function colouring.

What I feel like I end up wanting is something like

const fut = io.async(doA, .{});
const next_fut = io.asyncAfter(fut, doB);

But it may be that I’m missing some more obvious primitives for describing this.

Describing cases where I want exactly one operation to happen

I know Io.Select exists for creating a number of futures and getting results one at a time, and you can then discard the rest. This works great for something like DNS resolution where you don’t expect a side effect from making additional resolution queries.

But consider a case where you expect your work to have a side effect. For example, I have cases where I might have multiple Io.Queue (or say, socket) objects and I want to write my next piece of data to whichever socket is first available to writing. I can

  1. queue.put with a min size of 0. This does at least tell me to try the next one, but the checking loop has to resort to sleeping and trying again, or not blocking the thread and consistently trying.
  2. async a number of queue.putOne, and cancel once the first succeeds - this has the side effect that whilst cancelling other tasks may also putOne, so not a correct program I think. But may be some tricks here with other threading primitives and cancellation regions.
  3. I can probably ensure something happens exactly once with another mutex, but I have the (maybe wrong) impression that as a high level API with futures I don’t want to be reaching for mutex’s / thinking in threading primitives too often? But this may be a hangover from thinking of mutex’s as being part of a threading paradigm, rather than an Io primitive that may be provided by threading.

I think this one can also be an understanding gap for me. I feel like the method I want is something like putOneIfFirst which puts it on the queue if it’s the first put operation on the queue, and skips otherwise.

When viewed more from a syscall perspective, I understand that you if you make 3 async syscall writes you shouldn’t be surprised if more than 1 write occurs. But if all I want is a syscall to say “block until you can tell me which of these is writable” I think that kind of thing exists?

Use case: writing a performant network send buffer

So these both come up when thinking about this problem: I have an application that does something and wants to send data. My application in general is realtime and wants to send data as quickly as it can to the system. However, whilst waiting for a syscall to send, I would like to queue remaining data in the buffer, so that if the system is slow and I’m working quickly I can buffer writes in the next syscall.

So each call I want to make to a Io.Writer can be described as .writeMyMessage(), .maybeFlush() , where

  • writeMyMessage() may also result in Io if it’s larger than the buffer, and is not thread safe in the sense that it can’t run alongside a call to writeMyMessage with a different message (because they share a write buffer).
  • maybeFlush() then looks like: queue at most one syscall to flush this buffer

This is achievable now by creating a concurrent worker thread which does this and has to run in the background, and causing the application send to just put on a queue. But as noted above, this means the code doesn’t work in single-threaded mode - whereas it has a correct single-threaded program, which is to flush every message immediately.

I should prefix all of this with: I spend my day job writing high level code, and may perhaps just be too reluctant or unfamiliar to recognise that when programming at systems level you need to handle the gory details of managing more complex dependencies. But this has made me wonder if there could be scope for increasing the types of concurrent programs we can describe by slightly extending the available language in Io. Also, this may all already exist and I’m just missing something! Would love thoughts from those more experienced

(and also, if anyone out there knows Io really well and wants to implement TCP networking with URing on zig master I really want to try it out but am too dumb to implement it myself)

2 Likes

I’m sorry if I’m missing something, but aren’t you just saying that A - B - C has to happen synchronously (relative to each other)? What’s wrong with:

fn foo() void {
    A();
    B();
    C();
}

// ...

var fut = io.async(foo, .{});
defer fut.cancel(io);

// do some work

fut.await(io);

This is OP’s hesitation there.

2 Likes

Then put a conditional in that function? Why does it have to be moved to a higher scope?

Thanks for thoughts - as noted, the use case is if you want to use available CPU time to figure out whether or not you want to do B() whilst waiting for A() to complete some syscall?

To actually write some code, imagine logic that in synchronous terms looks like

while (true) {
   const data = recvOnSocket(io, sock)
   var first_output = doSomethingWithData(data);
   sendOnAWhichMightSyscall(io, sock, output);
   if (shouldDoB(output)) {
      const b_output = expensiveBJob(output);
      // Important - this should only happen after sendOnA finishes 
      sendOnBWhichMightSyscall(io, sock, b_output);
   }
}

I then want my concurrent program to keep the property that all sends are done in order.

If I want to translate this into a concurrent program, the biggest gains are:

  • Whenever I receive, I can get the first output
  • When sending the first output, I can concurrently use that syscall time to calculate my second output
  • But, if the syscall for A is slow, I need to make sure I don’t run B too soon.

If I try and solve this with a function conditional, I end up wanting to nest async calls to ensure I can use syscall time during sendOnA to do expensiveBJob. A naive conditional without nested async will block potentially when sending on A.

This then starts to feel like function coloring, as I might have made sure that I put io.async calls in the right place in my library, but somebody else may not have?

2 Likes

I think that in most cases this is an ideal solution. In scenarios where users input tasks, what difficulties exist with this solution? If the user inputs a task queue all at once, I don’t think this is very difficult. If the user might insert new tasks in the middle of an already asynchronously executing task, then I think there must be a value independent of the program’s tasks themselves to coordinate order alignment, such as a session ID. In this case, I think what we need to do is add a task to manage such data in a coordinated way.

1 Like

A needs to signal its start and end, B needs to be aware of that and wait when appropriate.

If its just A and B a semaphore should suffice, and is flexible enough to allow reordering or absence of one or the other.

If instead you need A, B and C, you will probably need something more custom.

1 Like

I think another way of looking at this question is: how do you represent io_uring’s chained operations via Io?

With io_uring you can prepare a sequence of I/O operations that are dependent on one another such that (in kernel space) if one fails or is cancelled then all subsequent operations in the chain are cancelled. If the Linux Io.Evented variant is built on top of io_uring, you’d hope that you’d retain the ability to batch/chain operations in similar ways (understanding that this may be missing in 0.16 and filled in later).

I’m curious if anyone knows how you’d encode that in the new Io interface (I’m not caught up with all of the changes).

3 Likes

I have a feeling your generally worry comes from blocking the execution. The entire philosophy if the Io interface is to write blocking code. The best way to look for patterns using this is to look at how Go solves the problems. The answer is usually spawn more coroutines and use channels. Translated to Zig/Io, it means use io.async and io.concurent, and use Io.Queue. Don’t be afraid of blocking calls to queue, mutex, recv, etc.

8 Likes

Thanks all - it does at least sound like there’s not some special other solution I’m missing. Really nice to get everyones thoughts on this.

Where I’m getting hung up is - I really like the promise of the new interface that I can write code and not worry about the underlying IO implementation, and it can even work in a blocking way in single threaded mode. But I’m finding that if I want performance I need to at least stop thinking about it also working in a blocking way.

What I want is for the compiler to generate fairly optimal single threaded code, and fairly optimal multi-threaded code, because I’ve been able to describe my dependencies in a correct way, and I don’t know if there’s enough predicates for that here (or that is even the aim!).

Again, in the networking view, if you have an application that wants to generate and send a lot of data, the best concurrent code (I think) is your application thread putting data on a queue, and another thread pulling as much data off the queue and sending it, blocking on each syscall. But this would be expressed with io.concurrent, so is not compile-able in single threaded mode.

Another way to express this is for each piece of data, you put it in the queue and io.await the pull off the queue. This now works fine in single threaded mode. However, to get performance, you want to get as much off the queue as possible in each await’ed call (i.e. get rather than getOne). This then likely means many of your awaited calls are no-ops. It’s also a little odd, in that to the application the send(…) function now needs to do io internally and probably return a Future(!void) . (As I was finishing up this post, I also realized this wouldn’t necessarily work without an additional lock when making a syscall so that you don’t try and write twice from different await calls - so many race conditions!)

In this specific case the queue is quite a natural way to express the need for a lock, and it does work more generally for encoding “do A, then B, then C” as long as A, B and C are expressible easily as things you put on a queue. I think I’d be right in saying this does incur mutex overhead, as opposed to the async scheduler knowing to not schedule B until A is done.

My overall feeling is that perhaps I’m thinking about this the wrong way - I suspect there’s a way to view my specific problem from a different concurrency perspective where all these things fall out a bit easier, or it becomes natural to encode dependencies in a function and push the IO further out to the edges.

God, this concurrency stuff is always so hard to think about - hurts my brain! Can’t we just make a processor with one really really big core…

2 Likes

singled threaded mode is 1 os thread, but concurrent doesn’t mean > 1 os thread, it just means some unit of concurrency and there are units that can exist on a single os thread. See the std.Io.Evented implementation. I think you already tried it and found it doesn’t implement all the functionality you want yet, but it will eventually.

In the meantime, @lalinsky made their own implementation zio whose unit of concurrency does work on a single thread, and I believe implements the networking functionality you want.

4 Likes

If you want to express call A, then B and then C, then simply have a function that does

A(...);
B(...);
C(...);

That’s the most efficient option. Anything else adds overhead.

5 Likes