What is the best way to implement embarrassingly parallel loops with the upcoming IO system and a problem

In a previous post I asked how to implement an embarrassingly parallel loops.

Ultimately I ended up with a simple loop proposed by LucasSantos91 where the solution simply reduces to this function lunched on as many runners as possible.

fn run(counter: *std.atomic.Value(usize), items: []const Item, buffer: Buffer) void {
     while (true)  {
        var index = counter.fetchAdd(1, .monotonic);
        if(index >= items.len) return;
        doWork(items[index], buffer);
    }
}

fn main() void {
    //set up Allocator       

    var thread_pool = .init(allocator);
    defer thread_pool.deinit(allocator);
    
    const buffers = allocBuffers(allocator, thread_pool.getIdCount());
    defer buffers.deinit(allocator);
   
   
   for (0..thread_pool.getIdCount()) |i| {
       thread_pool.spawnWg(&wait_group, run, .{ &counter, items, buffers.get(i));
   }
   pool.waitAndWork(&wait_group);
}

This worked very well, and coupled with the knowledge of how many threads are allocated in the thread poll, I can reserve the required memory for the doWork at the start (and making it infallible), and not pay the cost for loadBuffers for each item.

The recent changes around IO introduced the async/concurrent primitives to express parallel work.

This is my first attempt to adapt to the new API

fn main() void {
   // set up Io and Allocator   

   const max_concurrency = std.Thread.getCpuCount();
   const buffers = allocBuffers(allocator, thread_pool.getIdCount());
   defer buffers.deinit(allocator);
   
   var group: std.Io.Group = .init;
   defer group.cancel(io);

   for(0..max_concurrency) |i| {
      group.async(run, .{ &counter, items, buffers.get(i));
   }
   group.wait();
}

This seems optimal, it is still infallible, and the group primitive nicely replaces the wait_group from the thread_pool.
The only problem I have is that I need to query the max_concurrency myself and I cannot ask it to the Io implementation, so if I use a init_single_threaded I overallocate memory for the buffers and I launch unnecessary tasks.

3 Likes

Hm… if this is for a library then you might accept max_concurrency as a parameter, and make it the responsibility of whoever sets up the io instance to pass you a reasonable value. It sounds nice to be configurable in that way, maybe your clients don’t want you using all cores.

And if “whoever sets up the io instance” is you from just a few lines above, then getting max_concurrency yourself seems fine, I’m not sure I see the problem. If you change to Io.Threaded.init_single_threaded then yes you’ll need to also change your definition of max_concurrency.

It might be nice to able to say something like this:

const n: usize = io.getParallelism();
const max_concurrency = @min(n, wanted_concurrency);

but I’m not aware of any way to do that.

3 Likes

I think you can just do :

const builtin = @import("builtin");

const max_concurrency = if (builtin.single_threaded) 1 else std.Thread.getCpuCount();
2 Likes

Yeah, It would be nice to have an io.getParallelism();

API wise, I am a huge fan on the one I saw in the Sorbet type checker:

Translating to Zig,

const CpuPool = struct {
    // Runs an instance of `function` on all available CPU cores in parallel.
    fn multiplexJob(
        pool: *@This(),
        function: anytype,
        args: std.meta.ArgsTuple(@TypeOf(function)),
     ) void { ... }
}

This works for any sufficiently coarse-grained problem, and is massively simpler to use or implement than cilk-style fork-join (if you need the latter, check out GitHub - judofyr/spice: Fine-grained parallelism with sub-nanosecond overhead in Zig)

In terms of how to map it to IO, my gut feeling is that IO&concurrency and data parallelism are usually orthogonal aspects of an application, so I’d probably thread IO and CPUPool separately:

fn readLotsOfFiles(io: IO, files: []const File) void;
fn computeLotsOfHashes(cpu_pool: CpuPool, data: []const []const u8) void;
fn hashLotsOfFiles(io: IO, cpu_pool: CpuPool, files: []const File) void;

While it might make sense to implement CpuPool on top of IO sometimes, it doesn’t always make sense. For example, I think it’s reasonable to use single-threaded io_uring based IO in control plane and thread-based CpuPool in the data plane.

4 Likes

Thank you.
This is basically the way I’m doing it right now. I accept the std.Thread.Pool and use an auxiliary function to launch the jobs on that pool.

I agree with you that what I’m doing is not IO, and it doesn’t need to use the synchronization facilities provided by the IO abstraction.
I guess I’m going to apply the null hypothesis.

1 Like

This pull removes Thread.Pool so now the library must adapt to the new IO system for parallelism.

So now I would like to ask the interested ziguanas in how they would solve this problem, i.e. how to implement a embarrassingly parallel loop where you launch n workers for each unit of concurrency.
Each one increments an atomic counter to work on the next item so you can add any amount of them, and some amount of resources are allocated for each worker.
The maximum amount of concurrent worker is needed to reserve the necessary memory upfront.

Thanks in advance.

Now this is probably not the most optimal way but this is how I have done it in fz. I use Io.Group.concurrent to launch n workers which each get a pointer to a work_queue: Io.Queueu(Work), they each do the following,

fn worker(io: Io, gpa: Allocator, work_queue: *Io.Queue(Work)) void {
// allocate resources for workers here.

    var work_buf: [256]Work = undefined;
    while (worker_queue.get(io, &work_buf, 1)) |n| {
        for (work_buf[0..n]) |work| {
            work.match.updateScore(work.needle, &d, &m);
        }
        work[0].result_queue.putOne(io, n) catch return;
    } else |_| {
        log.err("worker got canceled");
    }
}

then to give out work to the workers I create a result_queue and a sendwork fn that pushes work to the work_queue concurrently and then waits for the correct amount of results to be given back.

pub const Work = struct {
    match: *Match,
    needle: []const u8,
    result_queue: *Io.Queue(usize),
};


pub fn updateMatches(
    io: Io,
    search_str: []const u8,
    matches: []Match,
    work_queue: *Io.Queue(Work),
) ![]const Match {

    if (search_str.len == 0) {
        // restore to original
        Match.sortMatches(matches, Match.orderByIdx);
        for (matches) |*match| {
            match.score = Match.score_min;
            @memset(match.positions, false);
        }
        return matches[0..];
    }

    var buf: [MAX_SEARCH_LEN]u8 = undefined;
    const needle = util.lowerString(&buf, search_str);

    var result_queue_buf: [2048]usize = undefined;
    var result_queue: Io.Queue(usize) = .init(&result_queue_buf);

    var send_work = try io.concurrent(sendWork, .{ io, work_queue, &result_queue, needle, matches });
    defer send_work.cancel(io) catch {};

    var finnished: usize = 0;
    var result_buf: [64]usize = undefined;
    while (result_queue.get(io, &result_buf, 1)) |result_count| {
        for (result_buf[0..result_count]) |n| finnished += n;
        if (finnished >= matches.len) break;
    } else |err| return err;

    Match.sortMatches(matches, Match.orderByScore);

    var start: usize = 0;
    var len: usize = 0;
    for (matches) |match| {
        if (match.score <= 0) break;
        if (match.score == score_max) start += 1;
        len += 1;
    }
    assert(start <= len);
    return matches[start..len];
}

fn sendWork(
    io: Io,
    work_queue: *Io.Queue(Work),
    result_queue: *Io.Queue(usize),
    needle: []const u8,
    matches: []Match,
) !void {
    const tr = tracy.trace(@src());
    defer tr.end();
    for (0..matches.len) |i| {
        try work_queue.putOne(io, .{
            .match = &matches[i],
            .needle = needle,
            .result_queue = result_queue,
        });
    }
}

It depends on the program, which often (and should) depend on the problem domain.

Depending on your problem separating the IO and the processing parts can make the most sense and in others merging them can be (because it would lead to an architecture monstrosity nobody can understand if you don’t).

So having a multithreaded io_uring based IO implementation can be the thing you want.

I think the intended usage is that you create one Io.Threaded and one Io.Evented.

Considering your example in specific, I would probably merge them (yes, I know about std.Io.Writer.Hashed, but is besides the point discussed here):

for (files, results) |f, *r| {
    group.async(computeHash, .{ io, f, r });
}
group.await();
fn computeHash(io: std.Io, file: File, result: *Errors!Hash) void {
    const BUFFER_SIZE = 1024;
    // open file and get std.Io.Reader
    // get hasher
    var workBuffer: [BUFFER_SIZE]u8 = undefined;
    var justRead: []const u8 = reader.peek(BUFFER_SIZE) catch |err| switch(err) {
        error.EndOfStream => {
            result.* = error.FileEmpty;
            return;
        },
        else => |e| {
            result.* = e;
            return;
        }
    };
    var work: []const u8 = &workBuffer;
    while (true) {
        work.len = justRead.len;
        @memcpy(work, justRead);
        const fut = io.async(&std.Io.Reader.peek, .{ reader, BUFFER_SIZE });
        defer _ = fut.cancel();
        hasher.update(work);
        justRead = fut.await() catch |err| {
            if (err == error.EndOfStream) break;
            r.* = err;
            return;
        };
    }
    result.* = hasher.final();
}

(While writing I noticed that std.Io.File.Reader has no documentation on why it has multiple error fields. I hope this will be added before 0.16.0. If not, I guess I will figure that out and create a MR to do so afterwards.)

Also, looking at how the reader is implemented, every std.Io.File.Reader carries the std.Io interface it belongs to around with it. I wonder if we will long term see the same happen as with the unmanaged and managed containers.

I doubt it, the io being hidden here means you can inject nonblocking reader/writers into code without the code needing to explicitly support passing an io around.

That is extremely flexible, especially since you are much more likely to write code that never touches io than you would an allocator IME (obviously depends on the application).

You could say this about the managed containers too, couldn’t you? If each operation on a Reader/Writer requires its corresponding io interface, it’s practically the same as with managed containers.

Besides, one (nice) side effect of requiring an allocator to be passed in, is that you know by the signature what the function can (or can’t) do (without starting to use globals like std.heap.smp_allocator of course, but that’s generally seen as bad practise anyway and should be detectable by a to be written static analyser).
Having that for IO would be nice too since that’s normally more expensive (in runtime) than memory allocation.

Technically the case, yes, but it also means that if you have a struct with 2 or more readers or writers (or a combination) or a function which requires multiple, that you pay for that (which was one of the reasons for deprecating the managed containers).

I guess at the end of the day we will see what happens when the wider Zig community starts to gather experience with it over the coming years.

2 Likes

I think being able to trivially change between dealing with io or memory is more valuable than being explicit/memory efficient.

While you can compare this to allocators, I think you need to be much more nuanced.
There is a lot more control with memory than with io, especially since the Io interface is abstracting a much larger set of platform APIs, there is no way around it limiting control.
I think explicit Io with readers/writers is just less valuable, the entire point of these interfaces is to obscure where data comes and goes.

Allocators only obscure how the memory is gotten, that leaves much more to you to control. So it is more valuable to have that control.

I’m also tackling this problem. To achieve the same result as before, I think the solution is use concurrent instead of async:

while(true){
      group.concurrent(run, .{ &counter, items, buffers.get(i)) catch break;
   }

concurrent returns an error when the implementation decides it doesn’t want more threads. For Io.Threaded, the default for this limit is the cpu count, though the user can adjust that. In this case, you would need to allocate the buffers one by one, right before passing it to concurrent.

Another option is to let the Io implementation decide how to divide the work. You forego the counter, and create one async job for each element that needs processing:

for(items, buffers) |item, *buffer| {
  group.async(doWork, .{item, buffer});
}

The implementation will then handle splitting it between the threads. Though you would have to either allocate one buffer for each job, or find a smart way of allocating the buffers.

Perhaps the best solution would be for Io to have a queryConcurrencyAvailable() that gives an estimate of how much concurrency units are left. Granted, since we are dealing with asynchronous and multithreaded problems, the concurrency units available would be subject to time-of-query/time-of-use problems.

I think the best option is going to be inspired by how this is commonly solved in Go:

const max_concurrency = 8;

var queue_buffer: [max_concurrency]T = undefined;
var queue: std.Io.Queue = .init(&queue_buffer);

var group: std.Io.Group = .init;
defer group.cancel(io);

for (0..max_concurrency) |i| {
 group.concurrent(io, processTask, .{io, &queue});
}

for (items) |item| {
  try queue.putOne(io, item);
}
queue.close(io);

group.wait(io);

That shouldn’t be a problem. You launch n_max number of async treads (allowing for max parallelism), then when one terminates you request cancellation of the other ones.

Also I don’t really like a worker that splits it’s working slice to more workers. I feel like the atomic counter should remain as the solution. It’s the simplest one and is the minimal amount of state that’s actually needed. If that becomes the bottleneck just add batch_size instead.

That requires a queue, not needed with the current design. However thanks for the suggestion.
Also this still suffer from needing to know max_concurrency in some way.

As people working on high thread count systems (as in CPUs with a few hundred hardware threads; yes that exists, even x86_64 systems) figured out, ThreadPools (or to be exact, TaskQueues) don’t scale well to these numbers thanks to their overhead:
https://www.youtube.com/watch?v=oj-_vpZNMVw

1 Like

I agree with that, but I’d argue then you don’t want the I/O interface, but some kind of MPI toolkit which is unlikely to be in the std.