Hello, I was writting a program that needed to “pipe” from a std.Io.Reader to an std.Io.Writer and I wasn’t happy with the std.Io.Reader.stream method because it doesn’t read and write in parallel.
I tried using the std.Io.Queue which worked well until I encountered some unexpected behavior.
I was expecting std.Io.Queue.get(&queue, io, buffer, min) to return as soon as min elements where populated inside buffer, but instead, it only returned when the buffer was filled.
The documentation states:
/// Receives elements from the beginning of the queue. The function
/// returns when at least `min` elements have been populated inside
/// `buffer`.
///
/// Returns how many elements of `buffer` have been populated.
///
/// Asserts that `buffer.len >= min`.
I have tried investigating with a test:
const std = @import("std");
test {
const io = std.testing.io;
var queue_buffer: [8]u8 = undefined;
var queue = std.Io.Queue(u8).init(&queue_buffer);
var fut1 = try io.concurrent(producer, .{ io, &queue });
defer fut1.cancel(io) catch {};
try io.sleep(.fromMilliseconds(1500), .real);
var fut2 = try io.concurrent(consumer, .{ io, &queue });
defer fut2.cancel(io) catch {};
_ = try io.select(.{ &fut1, &fut2 });
}
fn producer(io: std.Io, queue: *std.Io.Queue(u8)) !void {
while (true) {
try io.sleep(.fromSeconds(1), .real);
_ = try queue.put(io, &.{ 0, 0 }, 2);
std.debug.print("put 2 items\n", .{});
}
}
fn consumer(io: std.Io, queue: *std.Io.Queue(u8)) !void {
var buffer: [8]u8 = undefined;
while (true) {
const n = try queue.get(io, &buffer, 1);
std.debug.print("got {d} items\n", .{n});
}
}
This test outputs:
put 2 items
got 2 items
put 2 items
put 2 items
put 2 items
put 2 items
got 8 items
put 2 items
put 2 items
put 2 items
put 2 items
got 8 items
put 2 items
...
The first get doesnt fill the buffer (which is the behavior I want), but all the subsequent ones do. Does std.Io.Queue.get only guarantee that at least min items will be populated? If so, is there another standard container that would allow me to do what I want, or do I need to write/modify one?
Also, why do we observe this behavior? I got it down to std.Io.Queue.get calling the TypeErasedQueue.getLocked function and then only calling pending.condition.wait if there were no items immediately available in the queue but I got lost from there.
Thanks!