Building a multithreaded timed wait message queue

I trying to build a queue for communicating between an worker and the main render thread for an app I’m building. I want to be able to wait for messages effeciently (i.e. not busy waiting), but wake up in a certain amount of time to send a ping/noop to the server to keep a connection alive. When there is a message is available, I want to wake immediately and handle the message.

This is my first foray into low level synchronization primitives. I found the Futex option in the std library and it seems like what I want. Here is what I have so far:

const Queue = struct {
    lock: std.atomic.Value(u32) = .init(0),
    messages: [8]Message = undefined,
    read_index: u8 = 0,
    write_index: u8 = 0,

    const WAKE_VALUE: u32 = 3232;

    pub fn push(self: *Queue, msg: Message) !void {
        if (self.read_index == self.write_index + 1) {
            return error.QueueFull; // Queue is full
        }
        std.debug.assert(self.write_index < self.messages.len);
        self.messages[self.write_index] = msg;
        self.write_index += 1;
        if (self.write_index >= self.messages.len) {
            self.write_index = 0;
        }
        self.lock.store(WAKE_VALUE, .seq_cst);
    }
    fn isEmpty(self: *Queue) bool {
        return self.read_index == self.write_index;
    }

    pub fn pop(self: *Queue, timeout: u64) ?Message {
        if (self.isEmpty()) {
            std.Thread.Futex.timedWait(&self.lock, 0, timeout) catch {};
        }
        if (self.isEmpty()) {
            return null;
        }
        self.lock.store(0, .seq_cst);
        const msg = self.messages[self.read_index];
        self.read_index += 1;
        if (self.read_index >= self.messages.len) {
            self.read_index = 0;
        }
        return msg;
    }
};

The main thread calls push and the worker calls pop.

The issue I am seeing right now is that it’s not waking when I store the wake value. It still waits the timeout length. If I change the timedWait to look for the WAKE_VALUE, then it always wakes up immediately. Am I doing something wrong? Is Futex the right call here?

2 Likes

I also was recently implementing stuff like this for the first time.

You could try using a condition variable on wake. I’m not sure how good the latency is as I haven’t tested it, but it will achieve what you want by setting up a queue of threads waiting on the condition and then waking them.

1 Like

That worked. I now have the logic as follows:

const Queue = struct {
    mutex: std.Thread.Mutex = .{},
    cond: std.Thread.Condition = .{},
    messages: [8]Message = undefined,
    read_index: u8 = 0,
    write_index: u8 = 0,

    pub fn push(self: *Queue, msg: Message) !void {
        if (self.read_index == self.write_index + 1) {
            return error.QueueFull; // Queue is full
        }
        std.debug.assert(self.write_index < self.messages.len);
        self.messages[self.write_index] = msg;
        self.write_index += 1;
        if (self.write_index >= self.messages.len) {
            self.write_index = 0;
        }
        self.mutex.lock();
        self.mutex.unlock();
        self.cond.signal();
    }
    fn isEmpty(self: *Queue) bool {
        return self.read_index == self.write_index;
    }

    pub fn pop(self: *Queue, timeout: u64) ?Message {
        if (self.isEmpty()) {
            self.mutex.lock();
            defer self.mutex.unlock();
            self.cond.timedWait(&self.mutex, timeout) catch {};
        }
        if (self.isEmpty()) {
            return null;
        }
        const msg = self.messages[self.read_index];
        self.read_index += 1;
        if (self.read_index >= self.messages.len) {
            self.read_index = 0;
        }
        return msg;
    }
};

Hopefully I got the mutex stuff right. But it works. It returns as soon as there is a message or waits until the time out.

3 Likes

I think there was something mentioned about a similar pattern coming with the new Io stuf. Something akin to golang channels, with producer, consumer and something akin to a time.Ticker using a select pattern. I cant find it now, so it might have been in the video kristoff did?

Andrew had a gist that showed a select on tasks that would solve this problem in the more mainstream async/await style. So yes, the new Io stuff might make this more high level.

1 Like

Ah yes, thank-you, that was the exact example I had in mind :slight_smile:

1 Like

Check my mailbox

1 Like