In a previous post I asked how to implement an embarrassingly parallel loops.
Ultimately I ended up with a simple loop proposed by LucasSantos91 where the solution simply reduces to this function lunched on as many runners as possible.
fn run(counter: *std.atomic.Value(usize), items: []const Item, buffer: Buffer) void {
while (true) {
var index = counter.fetchAdd(1, .monotonic);
if(index >= items.len) return;
doWork(items[index], buffer);
}
}
fn main() void {
//set up Allocator
var thread_pool = .init(allocator);
defer thread_pool.deinit(allocator);
const buffers = allocBuffers(allocator, thread_pool.getIdCount());
defer buffers.deinit(allocator);
for (0..thread_pool.getIdCount()) |i| {
thread_pool.spawnWg(&wait_group, run, .{ &counter, items, buffers.get(i));
}
pool.waitAndWork(&wait_group);
}
This worked very well, and coupled with the knowledge of how many threads are allocated in the thread poll, I can reserve the required memory for the doWork at the start (and making it infallible), and not pay the cost for loadBuffers for each item.
The recent changes around IO introduced the async/concurrent primitives to express parallel work.
This is my first attempt to adapt to the new API
fn main() void {
// set up Io and Allocator
const max_concurrency = std.Thread.getCpuCount();
const buffers = allocBuffers(allocator, thread_pool.getIdCount());
defer buffers.deinit(allocator);
var group: std.Io.Group = .init;
defer group.cancel(io);
for(0..max_concurrency) |i| {
group.async(run, .{ &counter, items, buffers.get(i));
}
group.wait();
}
This seems optimal, it is still infallible, and the group primitive nicely replaces the wait_group from the thread_pool.
The only problem I have is that I need to query the max_concurrency myself and I cannot ask it to the Io implementation, so if I use a init_single_threaded I overallocate memory for the buffers and I launch unnecessary tasks.