What is the best way to implement embarrassingly parallel loops with the upcoming IO system and a problem

In a previous post I asked how to implement an embarrassingly parallel loops.

Ultimately I ended up with a simple loop proposed by LucasSantos91 where the solution simply reduces to this function lunched on as many runners as possible.

fn run(counter: *std.atomic.Value(usize), items: []const Item, buffer: Buffer) void {
     while (true)  {
        var index = counter.fetchAdd(1, .monotonic);
        if(index >= items.len) return;
        doWork(items[index], buffer);
    }
}

fn main() void {
    //set up Allocator       

    var thread_pool = .init(allocator);
    defer thread_pool.deinit(allocator);
    
    const buffers = allocBuffers(allocator, thread_pool.getIdCount());
    defer buffers.deinit(allocator);
   
   
   for (0..thread_pool.getIdCount()) |i| {
       thread_pool.spawnWg(&wait_group, run, .{ &counter, items, buffers.get(i));
   }
   pool.waitAndWork(&wait_group);
}

This worked very well, and coupled with the knowledge of how many threads are allocated in the thread poll, I can reserve the required memory for the doWork at the start (and making it infallible), and not pay the cost for loadBuffers for each item.

The recent changes around IO introduced the async/concurrent primitives to express parallel work.

This is my first attempt to adapt to the new API

fn main() void {
   // set up Io and Allocator   

   const max_concurrency = std.Thread.getCpuCount();
   const buffers = allocBuffers(allocator, thread_pool.getIdCount());
   defer buffers.deinit(allocator);
   
   var group: std.Io.Group = .init;
   defer group.cancel(io);

   for(0..max_concurrency) |i| {
      group.async(run, .{ &counter, items, buffers.get(i));
   }
   group.wait();
}

This seems optimal, it is still infallible, and the group primitive nicely replaces the wait_group from the thread_pool.
The only problem I have is that I need to query the max_concurrency myself and I cannot ask it to the Io implementation, so if I use a init_single_threaded I overallocate memory for the buffers and I launch unnecessary tasks.

1 Like

Hm… if this is for a library then you might accept max_concurrency as a parameter, and make it the responsibility of whoever sets up the io instance to pass you a reasonable value. It sounds nice to be configurable in that way, maybe your clients don’t want you using all cores.

And if “whoever sets up the io instance” is you from just a few lines above, then getting max_concurrency yourself seems fine, I’m not sure I see the problem. If you change to Io.Threaded.init_single_threaded then yes you’ll need to also change your definition of max_concurrency.

It might be nice to able to say something like this:

const n: usize = io.getParallelism();
const max_concurrency = @min(n, wanted_concurrency);

but I’m not aware of any way to do that.

3 Likes

I think you can just do :

const builtin = @import("builtin");

const max_concurrency = if (builtin.single_threaded) 1 else std.Thread.getCpuCount();

Yeah, It would be nice to have an io.getParallelism();

API wise, I am a huge fan on the one I saw in the Sorbet type checker:

Translating to Zig,

const CpuPool = struct {
    // Runs an instance of `function` on all available CPU cores in parallel.
    fn multiplexJob(
        pool: *@This(),
        function: anytype,
        args: std.meta.ArgsTuple(@TypeOf(function)),
     ) void { ... }
}

This works for any sufficiently coarse-grained problem, and is massively simpler to use or implement than cilk-style fork-join (if you need the latter, check out GitHub - judofyr/spice: Fine-grained parallelism with sub-nanosecond overhead in Zig)

In terms of how to map it to IO, my gut feeling is that IO&concurrency and data parallelism are usually orthogonal aspects of an application, so I’d probably thread IO and CPUPool separately:

fn readLotsOfFiles(io: IO, files: []const File) void;
fn computeLotsOfHashes(cpu_pool: CpuPool, data: []const []const u8) void;
fn hashLotsOfFiles(io: IO, cpu_pool: CpuPool, files: []const File) void;

While it might make sense to implement CpuPool on top of IO sometimes, it doesn’t always make sense. For example, I think it’s reasonable to use single-threaded io_uring based IO in control plane and thread-based CpuPool in the data plane.

2 Likes

Thank you.
This is basically the way I’m doing it right now. I accept the std.Thread.Pool and use an auxiliary function to launch the jobs on that pool.

I agree with you that what I’m doing is not IO, and it doesn’t need to use the synchronization facilities provided by the IO abstraction.
I guess I’m going to apply the null hypothesis.

1 Like