Criticism request: Single-producer, single-consumer queue

Hello there, I want a thread-safe queue for a project of mine. In general, I expect there to be a significant delay in between pushes. I based this queue off of a CppCon talk, which you may reference, and added waiting. Does this code fit the use case? I would also love for any programming errors to be pointed out:

3 Likes

Cool! I’m just reading through it casually - can you provide that link to the talk you mentioned?

One thing I see is this line here:

pub inline fn tryPush(self: *Self, value: T) error{QueueFull}!void {

I know Zig allows defining the error at this point, but this can prevent composition. Say the user wants to merge an error set and it will include your queue’s error…

const AllErrors = MyErrors || T.QueueErrors;

I’d declare the error set in your queue struct to make it easier to access.

1 Like

I am watching the video first.

In Queue precondition:

if (1 << log2_len >= 1 << 32 or (1 << log2_len) * @sizeOf(T) > std.math.maxInt(usize)) @compileError("Maximum queue length exceeded");

You can simplify 1 << log2_len >= 1 << 32 to log2_len >= 32
Also, must be 32 or @bitSize(usize), must be >= or just >?

In > std.math.maxInt(usize) maxInt returns the maximum uint, it is not possible to have something bigger.

Together:

if (log2_len >= 32 or (1 << log2_len) > std.math.maxInt(usize) / @sizeOf(T)) @compileError("Maximum queue length exceeded");

can you provide that link to the talk you mentioned?

It’s in the README.

I’d declare the error set in your queue struct to make it easier to access.

Thanks for the suggestion, but I think error{QueueFull} is easy enough to type.

Thanks.

The integers are 32-bit because Futex needs 32-bit integers.

I thought comptime_int was free of such restrictions?

Do you think this is a satisfactory rewrite of that check:

if (log2_len >= 32) @compileError("Maximum queue length exceeded");

Since the compiler will catch it if it’s too big for usize.

My fault, maxInt returns comptime_int, I thought it returns the parameter type.

Yes, it is enough to catch both cases.
But one more assertion is always welcome.

1 Like

Just found: std.atomic.cache_line

It is mostly correct. I have Cavium mips64 with 128 byte cache line, Raspberry Pi aarch64 with 64 byte cache line, and all the x86_64 machines I own have also 64 byte cache line.
M2 aarch64 cache line is 128 byte but I don’t get why they choose these values for x86_64.

The problem with this value is that is used at compile-time for alignment, so you cannot use linux sysconf or windows GetLogicalProcessorInformation to get the value since these are runtime calls.

1 Like

push_cursor: Atomic(u32) align(cache_line_size) = .{ .raw = 0 },

I think it’s best to replace the u32 with std.math.IntFittingRange(0, cache_line_size). Since each field will have its own cache line, you might as well use all the bits in the cache line. It’s better than just wasting them as padding.

self.queue.pop_cursor.store(self.cursor +% 1, .Release);

I don’t think you want wrapping addition here. You’ll overwrite the first element in the queue if that happens. If you just don’t think that’s going to happen because the queue is large enough, then just do normal addition. If you want to handle overflow, I think you should return an error instead.

About all those .Monotonic, I’m not entirely sure if they’re correct. I find these memory orderings really hard, but I was under the impression that .Monotonic should be only be used for variables that are only increasing or decreasing, hence monotonic. Since the producer will be increasing the cursor and the consumer will be decreasing it, I think you need to use .Acquire and .Release.

Wrapped increment is required.

What is happening is:

  • power of 2 size is used because allows to use & mask where mask=size-1 instead of the expensive % size.
  • there is no code to zero the variable; it is always increasing. That’s why it’s important to have wrapped increment.
2 Likes

I’m not risking the shitty codegen that might entail.

As pointed out by @dimdin, it’s a circular buffer:

I am under the impression that the purpose of .Monotonic is to produce the bare-minimum load/store:

here

All atomic operations are guaranteed to be atomic within themselves (the combination of two atomic operations is not atomic as a whole!) and to be visible in the total order in which they appear on the timeline of the execution stream. That means no atomic operation can, under any circumstances, be reordered, but other memory operations might very well be. Compilers (and CPUs) routinely do such reordering as an optimization […]
Now, a relaxed is just that, the bare minimum. It does nothing in addition and provides no other guarantees. It is the cheapest possible operation. For non-read-modify-write operations on strongly ordered processor architectures (e.g. x86/amd64) this boils down to a plain normal, ordinary move.

This seems to be what I want when I use .Monotonic, and it was recommended by the talk as well. It would be cool if someone with deeper knowledge of atomics could take a look at the code.

bruh. thanks.

I am mainly looking for criticism of the waiting mechanism and atomics usage. Specifically:

  • Are the performance characteristics of the waiting, specifically the Futex.wake calls in the perform functions when the queue is full/empty, acceptable?
  • Should the code spin in the pusherW and popperW functions at all? If not, why, and if so, for how long?
  • Are any atomic operations in the code unnecessary or not enough?