Hello, I’m currently learning about atomics and lock-free programming using CAS operations. Turns out it’s quite complicated.
I’ve tried to implement a lock-free memory pool that doesn’t free any memory during its lifetime (inspired by the one in std). The main difficulty here basically boils down to implementing a linked list that will not break when multiple producers and consumers are operating on it concurrently.
A linked list sounds easy enough, however this is unfortunately the point where I found out about the ABA problem.
Luckily this problem has been known for a long time, so there are plenty of papers about it (including one specifically about linked lists by Timothy Harris).
I am using the approach he’s suggested in his paper which is marking nodes for deletion before actually deleting them to prevent other threads from meddling with a node once it’s been marked.
Unfortunately I am still not sure if my implementation really is mt safe and I also don’t really know how to test whether it is. I’ve heard about verification tools like Spin, however I’ve not yet managed to properly model my implementation in one of those tools and they don’t seem to consider the c++ memory model either.
Anyway, here’s my code, I would be very thankful for any input/feedback!
const std = @import("std");
const mem = std.mem;
const testing = std.testing;
const Allocator = mem.Allocator;
const assert = std.debug.assert;
pub const Options = struct {
alignment: ?mem.Alignment = null,
};
/// This is essentially an implementation of Timothy L. Harris's non-blocking
/// linked list. (https://timharris.uk/papers/2001-disc.pdf)
pub fn ConcurrentMemoryPool(comptime T: type, comptime options: Options) type {
return struct {
free_list: std.atomic.Value(Pointer) align(alignment),
const Self = @This();
const alignment = @max(
@alignOf(*anyopaque),
if (options.alignment) |a| a.toByteUnits() else @alignOf(T),
);
const alloc_size = @max(@sizeOf(T), @sizeOf(Node));
const Node = struct {
next: Pointer,
};
/// Uses the LSB to store information.
const Pointer = packed struct(usize) {
is_deleted: bool,
_: std.meta.Int(.unsigned, (@bitSizeOf(usize) - 1)),
pub const nullptr: Pointer = @bitCast(@intFromPtr(@as(?*anyopaque, null)));
pub fn init(ptr: anytype) Pointer {
return @bitCast(@intFromPtr(ptr));
}
pub fn asNode(pointer: Pointer) *align(alignment) Node {
return @ptrFromInt(pointer.asAddr());
}
pub fn asItem(pointer: Pointer) Item {
assert(!pointer.isNull());
return @ptrFromInt(pointer.asAddr());
}
pub fn asAddr(pointer: Pointer) usize {
return @bitCast(pointer);
}
pub fn existing(pointer: Pointer) Pointer {
var ex = pointer;
ex.is_deleted = false;
return ex;
}
pub fn deleted(pointer: Pointer) Pointer {
var del = pointer;
del.is_deleted = true;
return del;
}
pub fn isDeleted(pointer: Pointer) bool {
return pointer.is_deleted;
}
pub fn isNull(pointer: Pointer) bool {
return (pointer == nullptr);
}
};
pub const Item = *align(alignment) T;
pub const empty = Self{
.free_list = .init(.nullptr),
};
/// This function is NOT thread-safe.
pub fn initCapacity(gpa: Allocator, capacity: u32) Allocator.Error!Self {
var head: ?*align(alignment) Node = null;
for (0..capacity) |_| {
const item = try allocItem(gpa);
const node: *align(alignment) Node = @ptrCast(item);
node.next = .init(head);
head = node;
}
return Self{
.free_list = .init(.init(head)),
};
}
/// This function is NOT thread-safe.
pub fn deinit(self: *Self, gpa: Allocator) void {
while (self.free_list.raw.isNull() == false) {
const node = self.free_list.raw.asNode();
self.free_list.raw = node.next;
const item: Item = @ptrCast(@alignCast(node));
freeItem(gpa, item);
// std.debug.print("freed item: {*}\n", .{item});
}
self.* = undefined;
}
pub fn create(self: *Self, gpa: Allocator) Allocator.Error!Item {
var head = self.free_list.load(.acquire);
while (true) {
if (head.isNull()) {
// We don't have any free nodes left, allocate a new one.
@branchHint(.unlikely);
const item = try allocItem(gpa);
// std.debug.print("allocated new item: {*}\n", .{item});
return item;
}
while (!head.isDeleted()) {
// Set the is_deleted bit
if (self.free_list.cmpxchgWeak(
head,
head.deleted(),
.release,
.monotonic,
)) |new_head| {
head = new_head;
} else {
// Reload head to ensure that head.next is valid
head = self.free_list.load(.acquire);
}
}
// Someone has successfully set the is_deleted bit, we can now
// remove the node from the list.
// Note that head cannot be null here because is_deleted is set.
const head_next = head.existing().asNode().next;
if (self.free_list.cmpxchgWeak(
head,
head_next,
.release,
.acquire,
)) |new_head| {
head = new_head;
} else {
// We have successfully removed head from the list.
// std.debug.print("popped head: {*}\n", .{head.existing()});
return head.existing().asItem();
}
}
}
pub fn destroy(self: *Self, item: Item) void {
var node: *align(alignment) Node = @ptrCast(item);
var head = self.free_list.load(.acquire);
while (true) {
while (head.isDeleted()) {
// We are not allowed to push onto a deleted head,
// so we try to help in removing the current head
// from the free list.
@branchHint(.unlikely);
const head_next = head.existing().asNode().next;
if (self.free_list.cmpxchgWeak(head, head_next, .release, .monotonic)) |new_head| {
head = new_head;
} else {
head = self.free_list.load(.acquire);
}
}
// We now have a head which is not currently being deleted,
// even though it might be null.
node.next = head;
if (self.free_list.cmpxchgWeak(
head,
.init(node),
.release,
.acquire,
)) |new_head| {
// head might have is_deleted set now, so we need to try
// again from the beginning.
head = new_head;
} else {
// std.debug.print("pushed new head: {*}\n", .{node});
return;
}
}
}
fn allocItem(gpa: Allocator) Allocator.Error!Item {
const bytes = try gpa.alignedAlloc(u8, .fromByteUnits(alignment), alloc_size);
return @ptrCast(@alignCast(bytes[0..@sizeOf(T)]));
}
fn freeItem(gpa: Allocator, item: Item) void {
const bytes = @as([*]align(alignment) u8, @ptrCast(item))[0..alloc_size];
gpa.free(bytes);
}
};
}
test "basic usage" {
const gpa = testing.allocator;
var pool = ConcurrentMemoryPool(u32, .{}).empty;
defer pool.deinit(gpa);
const p1 = try pool.create(gpa);
defer pool.destroy(p1);
const p2 = try pool.create(gpa);
const p3 = try pool.create(gpa);
defer pool.destroy(p3);
try std.testing.expect(p1 != p2);
try std.testing.expect(p1 != p3);
try std.testing.expect(p2 != p3);
pool.destroy(p2);
const p4 = try pool.create(gpa);
defer pool.destroy(p4);
try std.testing.expect(p2 == p4);
}
test "init with capacity" {
const capacity = 4;
var limited_allocator = testing.FailingAllocator.init(testing.allocator, .{ .fail_index = capacity });
const limited = limited_allocator.allocator();
const Pool = ConcurrentMemoryPool(u32, .{});
var pool = try Pool.initCapacity(limited, capacity);
defer pool.deinit(limited);
var created: [capacity]Pool.Item = undefined;
for (0..capacity) |i| {
created[i] = try pool.create(limited);
}
defer for (created) |ptr| {
pool.destroy(ptr);
};
const error_union = pool.create(limited);
try testing.expectError(Allocator.Error.OutOfMemory, error_union);
}