This patchset adds `std.Io` and provides two implementations for it:
* std.Io….Threaded - based on a thread pool.
- `-fno-single-threaded` - supports concurrency and cancellation.
- `-fsingle-threaded` - does not support concurrency or cancellation.
* std.Io.Evented - work-in-progress, experimental. This API is not ready to be used yet, but it serves to inform the evolution of `std.Io` API.
- IoUring implementation for Linux proof of concept. This backend has really nice properties but it's not finished yet.
- KQueue implementation, implemented enough to prove the concept, including fixing [a common bug in other async runtimes](https://github.com/mitchellh/libxev/issues/125).
`std.Io.Threaded` has networking and file-system operations implemented.
Cancellation works beautifully, except for a known race condition that has a
couple of competing solutions already in mind.
All of `std.net` has been deleted in favor of `std.Io.net`.
`std.fs` has been partially updated to use `std.Io` - only as required so that
`std.Io.Writer.sendFile` could use `*std.Io.File.Reader` rather than
`*std.fs.File.Reader`.
closes https://github.com/ziglang/zig/issues/8224
## Laundry List of Io Features
* `async`/`await` - these primitives express that operations can be done
independently, making them infallible and support execution on limited Io
implementations that lack a concurrency mechanism.
* `concurrent` - same as `async` except communicates that the operation
*must* be done concurrently for correctness. Requires memory allocation.
* `cancel` - equivalent to `await` except also requests the Io implementation
to interrupt the operation and return `error.Canceled`. `std.Io.Threaded`
supports cancellation by sending a signal to a thread, causing blocking
syscalls to return `EINTR`, giving a chance to notice the cancellation request.
* `select` - API for blocking on multiple futures using `switch` syntax
* `Group` - efficiently manages many async tasks. Supports waiting for and
cancelling all tasks in the group together.
* `Queue(T)` - Many producer, many consumer, thread-safe, runtime configurable
buffer size. When buffer is empty, consumers suspend and are resumed by
producers. When buffer is full, producers suspend and are resumed by consumers.
- Avoids code bloat using a type safe wrapper around `TypeErasedQueue`.
* `Select` - for blocking on runtime-known number of tasks and handling a
subset of them.
* `Clock`, `Duration`, `Timestamp`, `Timeout` - type safety for units of measurement
* `Mutex`, `Condition` - synchronization primitives
## Demo
Here is an example that makes an HTTP request to a domain:
```zig
const std = @import("std");
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;
const Io = std.Io;
const HostName = std.Io.net.HostName;
pub fn main() !void {
var debug_allocator: std.heap.DebugAllocator(.{}) = .init;
const gpa = debug_allocator.allocator();
var threaded: std.Io.Threaded = .init(gpa);
defer threaded.deinit();
const io = threaded.io();
const args = try std.process.argsAlloc(gpa);
const host_name: HostName = try .init(args[1]);
var http_client: std.http.Client = .{ .allocator = gpa, .io = io };
defer http_client.deinit();
var request = try http_client.request(.HEAD, .{
.scheme = "http",
.host = .{ .percent_encoded = host_name.bytes },
.port = 80,
.path = .{ .percent_encoded = "/" },
}, .{});
defer request.deinit();
try request.sendBodiless();
var redirect_buffer: [1024]u8 = undefined;
var response = try request.receiveHead(&redirect_buffer);
std.log.info("received {d} {s}", .{ response.head.status, response.head.reason });
}
```
Thanks to the fact that networking is now taking advantage of the new `std.Io` interface,
this code has the following properties:
* It asynchronously sends out DNS queries to each configured nameserver
* As each response comes in, it immediately, asynchronously tries to TCP connect to the
returned IP address.
* Upon the first successful TCP connection, all other in-flight connection
attempts are canceled, including DNS queries.
* The code also works when compiled with `-fsingle-threaded` even though the
operations happen sequentially.
* No heap allocation.
You can see how this is implemented in `std.Io.net.HostName.connect`:
```zig
pub fn connect(
host_name: HostName,
io: Io,
port: u16,
options: IpAddress.ConnectOptions,
) ConnectError!Stream {
var connect_many_buffer: [32]ConnectManyResult = undefined;
var connect_many_queue: Io.Queue(ConnectManyResult) = .init(&connect_many_buffer);
var connect_many = io.async(connectMany, .{ host_name, io, port, &connect_many_queue, options });
var saw_end = false;
defer {
connect_many.cancel(io);
if (!saw_end) while (true) switch (connect_many_queue.getOneUncancelable(io)) {
.connection => |loser| if (loser) |s| s.closeConst(io) else |_| continue,
.end => break,
};
}
var aggregate_error: ConnectError = error.UnknownHostName;
while (connect_many_queue.getOne(io)) |result| switch (result) {
.connection => |connection| if (connection) |stream| return stream else |err| switch (err) {
error.SystemResources,
error.OptionUnsupported,
error.ProcessFdQuotaExceeded,
error.SystemFdQuotaExceeded,
error.Canceled,
=> |e| return e,
error.WouldBlock => return error.Unexpected,
else => |e| aggregate_error = e,
},
.end => |end| {
saw_end = true;
try end;
return aggregate_error;
},
} else |err| switch (err) {
error.Canceled => |e| return e,
}
}
pub const ConnectManyResult = union(enum) {
connection: IpAddress.ConnectError!Stream,
end: ConnectError!void,
};
/// Asynchronously establishes a connection to all IP addresses associated with
/// a host name, adding them to a results queue upon completion.
pub fn connectMany(
host_name: HostName,
io: Io,
port: u16,
results: *Io.Queue(ConnectManyResult),
options: IpAddress.ConnectOptions,
) void {
var canonical_name_buffer: [max_len]u8 = undefined;
var lookup_buffer: [32]HostName.LookupResult = undefined;
var lookup_queue: Io.Queue(LookupResult) = .init(&lookup_buffer);
host_name.lookup(io, &lookup_queue, .{
.port = port,
.canonical_name_buffer = &canonical_name_buffer,
});
var group: Io.Group = .init;
while (lookup_queue.getOne(io)) |dns_result| switch (dns_result) {
.address => |address| group.async(io, enqueueConnection, .{ address, io, results, options }),
.canonical_name => continue,
.end => |lookup_result| {
group.waitUncancelable(io);
results.putOneUncancelable(io, .{ .end = lookup_result });
return;
},
} else |err| switch (err) {
error.Canceled => |e| {
group.cancel(io);
results.putOneUncancelable(io, .{ .end = e });
},
}
}
```
## Upgrade Guide
### Missing `io` Parameter
If you need an `io` parameter, and you don't have one, you can get one like this:
```zig
var threaded: Io.Threaded = .init_single_threaded;
const io = threaded.io();
```
This is legal as long as these functions are not called:
* `Io.VTable.concurrent`
This is a non-ideal workaround - like reaching for `std.heap.page_allocator` when
you need an `Allocator` and do not have one. Instead, it is better to accept an
`Io` parameter if you need one (or store one on a context struct for convenience).
Point is that the application's `main` function should generally be responsible for
constructing the `Io` instance used throughout.
When you're testing you can use `std.testing.io` (much like `std.testing.allocator`).
### How to use async/await
Use this pattern to avoid resource leaks and handle cancellation gracefully:
```zig
var foo_future = io.async(foo, .{args});
defer if (foo_future.cancel(io)) |resource| resource.deinit() else |_| {}
var bar_future = io.async(bar, .{args});
defer if (bar_future.cancel(io)) |resource| resource.deinit() else |_| {}
const foo_result = try foo_future.await(io);
const bar_result = try bar_future.await(io);
```
If the `foo` or `bar` function does not return a resource that must be freed, then the `if` can be simplified to `_ = foo() catch {}`, and if the function returns `void`, then the discard can also be removed. The `cancel` is necessary however because it releases the async task resource when errors (including `error.Canceled`) are returned.
## Related
* https://github.com/ziglang/zig/issues/24510
* https://github.com/ziglang/zig/issues/23367
* https://github.com/ziglang/zig/issues/23446
* https://github.com/ziglang/zig/issues/1639
* https://github.com/ziglang/zig/issues/157
## Followup Issues
* https://github.com/ziglang/zig/issues/25738
* https://github.com/ziglang/zig/issues/25739
* https://github.com/ziglang/zig/issues/25740
* https://github.com/ziglang/zig/issues/25741
* https://github.com/ziglang/zig/issues/25742
* https://github.com/ziglang/zig/issues/25743
* https://github.com/ziglang/zig/issues/25744
* https://github.com/ziglang/zig/issues/25745
* https://github.com/ziglang/zig/issues/25746
* https://github.com/ziglang/zig/issues/25747
* https://github.com/ziglang/zig/issues/25748
* https://github.com/ziglang/zig/issues/25749
* https://github.com/ziglang/zig/issues/25750
* https://github.com/ziglang/zig/issues/25751
* https://github.com/ziglang/zig/issues/25752
* https://github.com/ziglang/zig/issues/25753
* https://github.com/ziglang/zig/issues/6600
* https://github.com/ziglang/zig/issues/14173
* https://github.com/ziglang/zig/issues/25754
* https://github.com/ziglang/zig/issues/25755
* https://github.com/ziglang/zig/issues/25756
* https://github.com/ziglang/zig/issues/25757
* https://github.com/ziglang/zig/issues/25760
* https://github.com/ziglang/zig/issues/25762
* add non-blocking flag to net and fs operations, handle EAGAIN
* Threaded: in netLookup, potentially add diagnostics to the output in several places