Hello, I tried implementing lock free queue using zig. It kinda works (tested it on mac arm), I want some opinions and suggestions regarding the code.
pub fn LockFreeStaticQueue(DataType: type, IndexType: type, comptime Size: IndexType) type {
if (Size & (Size - 1) != 0) {
@compileError("Size must be 2^n");
}
const AtomicIndex = Atomic(IndexType);
return struct {
items: [Size]DataType = [_]DataType{undefined} ** Size,
start: AtomicIndex = AtomicIndex.init(index_offset),
end: AtomicIndex = AtomicIndex.init(index_offset),
const mask = Size - 1;
pub const index_offset = 8;
pub const capacity = Size;
const Self = @This();
// only owning thread
pub fn len(self: Self) IndexType {
const s = self.start.load(.monotonic);
const e = self.end.load(.monotonic);
return e - s;
}
// only owning thread is allowed to push
pub fn push(self: *Self, value: DataType) void {
std.debug.assert(self.len() < capacity);
const e = self.end.load(.monotonic);
self.items[e & mask] = value;
self.end.store(e + 1, .release);
}
pub fn pushSlice(self: *Self, values: []const DataType) void {
const values_len: IndexType = @intCast(values.len); // overflow check?
std.debug.assert((capacity - self.len()) >= values_len);
const e = self.end.load(.monotonic);
const index = e & mask;
const first_half = capacity - index;
if (values.len <= first_half) {
@memcpy(self.items[index..][0..values_len], values);
} else {
const second_half = values.len - first_half;
@memcpy(self.items[index..][0..first_half], values[0..first_half]);
@memcpy(self.items[0..second_half], values[first_half..][0..second_half]);
}
self.end.store(e + values_len, .release);
}
// LIFO, only owning thread can pop
pub fn pop(self: *Self) ?DataType {
const e = self.end.fetchSub(1, .acq_rel);
const s = self.start.load(.monotonic);
if (e <= s) {
self.end.store(e, .monotonic);
return null;
}
const index = e - 1;
var job: ?DataType = self.items[index & mask];
// contention with steal if this is the last job
if (index == s) {
// compete with steal
if (self.start.cmpxchgStrong(s, s + 1, .monotonic, .monotonic) != null) {
self.end.store(e, .monotonic); // reset since other thread beat us
job = null;
} else {
// this cannot be seen by other threads before s => s+1 cmpxchg
self.end.store(s + 1, .release);
}
}
return job;
}
// FIFO, steal can be called by other threads
pub fn steal(self: *Self) ?DataType {
const e = self.end.load(.acquire);
const s = self.start.load(.monotonic);
if (e <= s) return null;
var job: ?DataType = self.items[s & mask];
// possible contention wiht other threads trying steal (or owner last pop)
// release coz we want the items read & copied before start is updated
if (self.start.cmpxchgStrong(s, s + 1, .release, .monotonic) != null) {
job = null;
}
return job;
}
};
}
its based on: Job System 2.0: Lock-Free Work Stealing – Part 3: Going lock-free | Molecular Musings