Hello Ziggit!
I was interested in learning more about Linux’s io_uring interface, so I decided to write a cat clone as a learning exercise. This was inspired by this article on unixism that makes a cat clone to demonstrate the low level io_uring API. Initially I was thinking I would use the low level interface and just port it to zig. It didn’t take me too long to find out that the zig standard library includes an interface more like liburing which made the code considerably simpler
Apart from learning more about io_uring, I also learned cat is meant for concatenating files, and that pipes are essentially just kernel-space read buffers between files. It was a fun a weekend project and I’m sharing it here in the hopes somebody else can learn something
//! zigcat.zig
//! Run with `zig run zigcat.zig -- [path/to/files]`
const std = @import("std");
const SubmissionQueueEntry = std.os.linux.io_uring_sqe;
const CallbackQueueEntry = std.os.linux.io_uring_cqe;
pub fn main() !void {
const stdout = std.io.getStdOut();
const cwd = std.fs.cwd();
const args = try std.process.argsAlloc(std.heap.page_allocator);
defer std.heap.page_allocator.free(args);
{
// Sometimes the terminal is set to append only, which breaks
// piping to stdout. We make sure that stdout is not set to append only.
const flags = try std.os.fcntl(stdout.handle, std.os.F.GETFL, 0);
if (flags & std.os.O.APPEND != 0) {
const new_flags = flags & ~@as(usize, std.os.O.APPEND);
_ = try std.os.fcntl(stdout.handle, std.os.F.SETFL, new_flags);
}
}
// When initializing IO_Uring, our ring buffer must be a power of two
// Calculate next largest power of two that will fit all our submission queue entries
const shift = @bitSizeOf(usize) - @clz((args.len - 1) * 2);
const ring_buf_size = @as(u64, 1) <<| shift;
// Initialize io_uring
var io_uring = try std.os.linux.IO_Uring.init(@intCast(ring_buf_size), 0);
defer io_uring.deinit();
// Create a pipe for splicing from the input files to stdout
const pipe_read, const pipe_write = try std.os.pipe();
const FileInfo = struct {
statx: std.os.linux.Statx,
fd: std.os.linux.fd_t,
};
const file_info_buf = try std.heap.page_allocator.alloc(FileInfo, args.len - 1);
defer std.heap.page_allocator.free(file_info_buf);
for (args[1..], 0..) |path, i| {
// Call stat, passing statx
const stat_id = i * 2;
_ = try io_uring.statx(stat_id, cwd.fd, path, 0, 0, &file_info_buf[i].statx);
// Open the file
const open_id = i * 2 + 1;
_ = try io_uring.openat(open_id, cwd.fd, path, 0, 0);
}
// We need both stat and open to finish, but we don't care in what order.
// We submit the queue and wait
const count = try io_uring.submit();
var opened: usize = 0;
var statted: usize = 0;
while (opened + statted < count) {
const cqe = try io_uring.copy_cqe();
if (cqe.err() != .SUCCESS) {
if (cqe.user_data % 2 == 0) return error.StatError;
if (cqe.user_data % 2 == 1) return error.OpenatError;
} else {
if (cqe.user_data % 2 == 0) statted += 1;
if (cqe.user_data % 2 == 1) {
opened += 1;
file_info_buf[cqe.user_data / 2].fd = cqe.res;
}
}
}
// Splice all the files to the pipe
var total_size: usize = 0;
for (file_info_buf, 0..) |finfo, i| {
var read = try io_uring.splice(i, finfo.fd, 0, pipe_write, std.math.maxInt(u64), finfo.statx.size);
read.flags |= std.os.linux.IOSQE_IO_LINK; // force ordering
total_size += finfo.statx.size;
}
// Splice all of the bytes to stdout
var write = try io_uring.splice(file_info_buf.len, pipe_read, std.math.maxInt(u64), stdout.handle, std.math.maxInt(u64), total_size);
write.flags |= std.os.linux.IOSQE_IO_LINK; // force ordering
// Loop over the cqes to make sure no errors occurred
const pipe_count = try io_uring.submit_and_wait(@intCast(file_info_buf.len));
var piped: usize = 0;
while (piped < pipe_count) : (piped += 1) {
const cqe = try io_uring.copy_cqe();
if (cqe.err() != .SUCCESS) {
std.debug.print("{}: {}\n", .{ cqe.err(), cqe });
if (cqe.user_data == file_info_buf.len) return error.OnWriteSplice;
return error.OnReadSplice;
}
}
// Close all the file descriptors
for (file_info_buf, 0..) |finfo, i| {
_ = try io_uring.close(i, finfo.fd);
}
const closed_count = try io_uring.submit();
var closed: usize = 0;
while (closed < closed_count) : (closed += 1) {
const cqe = try io_uring.copy_cqe();
if (cqe.err() != .SUCCESS) {
if (cqe.user_data % 2 == 0) return error.StatError;
if (cqe.user_data % 2 == 1) return error.OpenatError;
}
}
}