I trying to build a queue for communicating between an worker and the main render thread for an app I’m building. I want to be able to wait for messages effeciently (i.e. not busy waiting), but wake up in a certain amount of time to send a ping/noop to the server to keep a connection alive. When there is a message is available, I want to wake immediately and handle the message.
This is my first foray into low level synchronization primitives. I found the Futex option in the std library and it seems like what I want. Here is what I have so far:
const Queue = struct {
lock: std.atomic.Value(u32) = .init(0),
messages: [8]Message = undefined,
read_index: u8 = 0,
write_index: u8 = 0,
const WAKE_VALUE: u32 = 3232;
pub fn push(self: *Queue, msg: Message) !void {
if (self.read_index == self.write_index + 1) {
return error.QueueFull; // Queue is full
}
std.debug.assert(self.write_index < self.messages.len);
self.messages[self.write_index] = msg;
self.write_index += 1;
if (self.write_index >= self.messages.len) {
self.write_index = 0;
}
self.lock.store(WAKE_VALUE, .seq_cst);
}
fn isEmpty(self: *Queue) bool {
return self.read_index == self.write_index;
}
pub fn pop(self: *Queue, timeout: u64) ?Message {
if (self.isEmpty()) {
std.Thread.Futex.timedWait(&self.lock, 0, timeout) catch {};
}
if (self.isEmpty()) {
return null;
}
self.lock.store(0, .seq_cst);
const msg = self.messages[self.read_index];
self.read_index += 1;
if (self.read_index >= self.messages.len) {
self.read_index = 0;
}
return msg;
}
};
The main thread calls push
and the worker calls pop
.
The issue I am seeing right now is that it’s not waking when I store the wake value. It still waits the timeout length. If I change the timedWait
to look for the WAKE_VALUE
, then it always wakes up immediately. Am I doing something wrong? Is Futex the right call here?