Under what conditions does `std.Io.Queue.get` return?

Hello, I was writting a program that needed to “pipe” from a std.Io.Reader to an std.Io.Writer and I wasn’t happy with the std.Io.Reader.stream method because it doesn’t read and write in parallel.

I tried using the std.Io.Queue which worked well until I encountered some unexpected behavior.
I was expecting std.Io.Queue.get(&queue, io, buffer, min) to return as soon as min elements where populated inside buffer, but instead, it only returned when the buffer was filled.

The documentation states:

/// Receives elements from the beginning of the queue. The function
/// returns when at least `min` elements have been populated inside
/// `buffer`.
///
/// Returns how many elements of `buffer` have been populated.
///
/// Asserts that `buffer.len >= min`.

I have tried investigating with a test:

const std = @import("std");

test {
    const io = std.testing.io;
    var queue_buffer: [8]u8 = undefined;
    var queue = std.Io.Queue(u8).init(&queue_buffer);

    var fut1 = try io.concurrent(producer, .{ io, &queue });
    defer fut1.cancel(io) catch {};

    try io.sleep(.fromMilliseconds(1500), .real);

    var fut2 = try io.concurrent(consumer, .{ io, &queue });
    defer fut2.cancel(io) catch {};

    _ = try io.select(.{ &fut1, &fut2 });
}

fn producer(io: std.Io, queue: *std.Io.Queue(u8)) !void {
    while (true) {
        try io.sleep(.fromSeconds(1), .real);
        _ = try queue.put(io, &.{ 0, 0 }, 2);
        std.debug.print("put 2 items\n", .{});
    }
}

fn consumer(io: std.Io, queue: *std.Io.Queue(u8)) !void {
    var buffer: [8]u8 = undefined;
    while (true) {
        const n = try queue.get(io, &buffer, 1);
        std.debug.print("got {d} items\n", .{n});
    }
}

This test outputs:

put 2 items
got 2 items
put 2 items
put 2 items
put 2 items
put 2 items
got 8 items
put 2 items
put 2 items
put 2 items
put 2 items
got 8 items
put 2 items
...

The first get doesnt fill the buffer (which is the behavior I want), but all the subsequent ones do. Does std.Io.Queue.get only guarantee that at least min items will be populated? If so, is there another standard container that would allow me to do what I want, or do I need to write/modify one?

Also, why do we observe this behavior? I got it down to std.Io.Queue.get calling the TypeErasedQueue.getLocked function and then only calling pending.condition.wait if there were no items immediately available in the queue but I got lost from there.

Thanks!

What Zig version are you using?

On the latest build (0.16.0-dev.1657+985a3565c) I’m seeing the expected behaviour:

put 2 items
got 2 items
put 2 items
got 2 items
put 2 items
got 2 items
put 2 items
got 2 items
put 2 items
got 2 items
put 2 items
got 2 items
put 2 items
got 2 items
put 2 items
got 2 items

EDIT:

Try updating to a new version, I suspect you’re seeing this issue which was fixed in this commit which was merged earlier this week:

std.Io.Queue: introduce closure and fix a bug

Queues can now be “closed”. A closed queue cannot have more elements appended with put, and blocked calls to put will immediately unblock having failed to append some elements. Calls to get will continue to succeed as long as the queue buffer is non-empty, but will then never block; already-blocked calls to get will unblock.

All queue get/put operations can now return error.Closed to indicate that the queue has been closed. For bulk get/put operations, they may add/receive fewer elements than the minimum requested if the queue was closed or the calling task was canceled. In that case, if any elements were already added/received, they are returned first, and successive calls will return error.Closed or error.Canceled.

Also, fix a bug where Queue.get could deadlock because it incorrectly blocked until the given buffer was filled.

Resolves: #30141

3 Likes

You are right, I was using a version from before this commit, using the latest build fixes it, I should’ve tried that. Thanks!