[CodeReview] Lock Free Queue

Hello, I tried implementing lock free queue using zig. It kinda works (tested it on mac arm), I want some opinions and suggestions regarding the code.

pub fn LockFreeStaticQueue(DataType: type, IndexType: type, comptime Size: IndexType) type {
    if (Size & (Size - 1) != 0) {
        @compileError("Size must be 2^n");
    }

    const AtomicIndex = Atomic(IndexType);

    return struct {
        items: [Size]DataType = [_]DataType{undefined} ** Size,
        start: AtomicIndex = AtomicIndex.init(index_offset),
        end: AtomicIndex = AtomicIndex.init(index_offset),

        const mask = Size - 1;
        pub const index_offset = 8;
        pub const capacity = Size;

        const Self = @This();

        // only owning thread
        pub fn len(self: Self) IndexType {
            const s = self.start.load(.monotonic);
            const e = self.end.load(.monotonic);
            return e - s;
        }

        // only owning thread is allowed to push
        pub fn push(self: *Self, value: DataType) void {
            std.debug.assert(self.len() < capacity);
            const e = self.end.load(.monotonic);
            self.items[e & mask] = value;
            self.end.store(e + 1, .release);
        }
        pub fn pushSlice(self: *Self, values: []const DataType) void {
            const values_len: IndexType = @intCast(values.len); // overflow check?

            std.debug.assert((capacity - self.len()) >= values_len);

            const e = self.end.load(.monotonic);
            const index = e & mask;
            const first_half = capacity - index;
            if (values.len <= first_half) {
                @memcpy(self.items[index..][0..values_len], values);
            } else {
                const second_half = values.len - first_half;
                @memcpy(self.items[index..][0..first_half], values[0..first_half]);
                @memcpy(self.items[0..second_half], values[first_half..][0..second_half]);
            }
            self.end.store(e + values_len, .release);
        }

        // LIFO, only owning thread can pop
        pub fn pop(self: *Self) ?DataType {
            const e = self.end.fetchSub(1, .acq_rel);
            const s = self.start.load(.monotonic);
            if (e <= s) {
                self.end.store(e, .monotonic);
                return null;
            }

            const index = e - 1;
            var job: ?DataType = self.items[index & mask];
            // contention with steal if this is the last job
            if (index == s) {
                // compete with steal
                if (self.start.cmpxchgStrong(s, s + 1, .monotonic, .monotonic) != null) {
                    self.end.store(e, .monotonic); // reset since other thread beat us
                    job = null;
                } else {
                    // this cannot be seen by other threads before s => s+1 cmpxchg
                    self.end.store(s + 1, .release);
                }
            }
            return job;
        }

        // FIFO, steal can be called by other threads
        pub fn steal(self: *Self) ?DataType {
            const e = self.end.load(.acquire);
            const s = self.start.load(.monotonic);
            if (e <= s) return null;

            var job: ?DataType = self.items[s & mask];
            // possible contention wiht other threads trying steal (or owner last pop)
            // release coz we want the items read & copied before start is updated
            if (self.start.cmpxchgStrong(s, s + 1, .release, .monotonic) != null) {
                job = null;
            }

            return job;
        }
    };
}

its based on: Job System 2.0: Lock-Free Work Stealing – Part 3: Going lock-free | Molecular Musings

You could also just do items: [Size]DataType = undefined directly.

Depending on what your goals are, I think returning an error in this case might be better, than asserting. Other data structures (e.g. BoundedArray from the standard library) also return an error in situations where the internal buffer is too small.
Furthermore if you move the range check above the intCast, then the intCast can never fail.

However you should worry about integer overflow in your start and end indices. Even a u64 will eventually overflow, which would either fail in self.end.store(e + 1, .release); if compiled in ReleaseSafe or debug modes, or it would fail in if (e <= s), since the overflowed end is always smaller than start.
The solution would be to use wrapping arithmetic.

Lastly I would heavily recommend using the builtin thread sanitizer when testing, to make sure that you didn’t make any obvious mistakes here. You can enable it in the system with the .sanitize_thread = true flag.

4 Likes

Thanks, I didn’t really consider the indices eventually overflowing. Also, didn’t know we had builtin support for thread sanitizers.

Welcome to Ziggit @newdev!

A minor style critique: in your code, Size is capitalized like a type, or a type-returning function, the usual style for parameters and variables is snek_case. As a convention, this applies to comptime-only values as well. Of course this isn’t enforced by the compiler (or anything), it’s just a bit confusing, and probably would be to you as well, coming back to the code later.

The size & (size - 1) == 0 trick for power-of-two detection is clever! I’ve used @popCount(size) == 1, this has the modest advantage that it doesn’t underflow on 0, although in your case that wouldn’t matter. If it did matter I suppose size & (size -| 1) == 0 would work just as well.

3 Likes

std.math.isPowerOfTwo

1 Like

I’ve seen LLVM turn that into @popCount(x) <= 1, lol. So you may as well do @popCount(x) == 1. Probably not applicable to ARM though.

1 Like

I object to this implementation! isPowerOfTwo(0) is a valid question, the answer is no.

Fine on aarch64 of course, but I got curious and yeah, popcount takes ten cycles for a word on ARM32.

The fun question is if LLVM for that platform can strength-reduce @popcount(1) == 1 to branch zero / decrement to register / and registers / branch not-zero. This would not surprise me, but I’m not quite curious enough to scaffold it in Godbolt and check.