How to async/concurrent execute of subsets of tasks consecutively

Hi,

I’m a little bit stuck with getting my code to work the way I want it.

Since I’m really new to Zig and documentation (especially on master branch) code is sometimes missing and often outdated, I’m hoping for some help from the experts here :slightly_smiling_face:

What I try to accomplish is the following:

I want to download (GET) file content in chunks and afterwards put it back together as complete file. Additionally, I’m trying to define a max count of parallel downloads. Always when a single chunk download is finished, the next chunk should start downloading until there are no chunks left and the whole body is downloaded an put together to a new local file. I know thats a very common task, but its only the frame for a more specialized workflow in the final code. But for now I’m stuck with the basic task itself.

For a different program in Rust I accomplished that by using a Semaphore together with a Vec of Tokio tasks. Every task there get a permit as long as the max permit count of the semaphore isn’t reached. If there is no free permit, it idles until an earlier task is finished. That works fine, but the whole tokio/Semaphore code interface of Rust is very abstract and there is a lot of stuff happening in the background I don’t see, let alone understand if I look into the code.

Thus, I thought using a std.Io.Semaphore together with some async/concurrent functions could make it work in Zig too. I have the following code which is a simplified version of the stuff I want to accomplish. Its not really downloading anything but slicing a string into parts and returning each part after a sleep of 2 seconds. This should mimic a potential downloading time for bigger chunks good enough. The code runs fine and the file is put together correctly. But all tasks are executed in parallel meaning the whole code only takes 2 seconds. But what I want with this example should be about 12 seconds, because the max permit count of 3 of the semaphore.

const std = @import("std");
const Io = std.Io;
pub fn main(init: std.process.Init) !void {
    const allocator: std.mem.Allocator = init.gpa;

    var threaded: Io.Threaded = .init(allocator, .{ .environ = init.minimal.environ });
    defer threaded.deinit();
    const io = threaded.io();

    // Some dumb blind text
    const content =
        \\Far far away, behind the word mountains, far from the countries Vokalia and
        \\Consonantia, there live the blind texts. Separated they live in Bookmarksgrove
        \\right at the coast of the Semantics, a large language ocean. A small river named
        \\Duden flows by their place and supplies it with the necessary regelialia. It is
        \\a paradisematic country, in which roasted parts of sentences fly into your
        \\mouth. Even the all-powerful Pointing has no control about the blind texts it is
    ;

    const ContentRange = struct {
        start: usize,
        end: usize,
    };

    const part_size = 50;

    // Calculate parts count
    var part_count = (content.len / part_size) + 1;
    var last_part_size = content.len % part_size;
    if (last_part_size == 0) {
        last_part_size = part_size;
        part_count -= 1;
    }

    // Create array of part ranges
    var ranges = std.ArrayList(ContentRange).empty;
    defer ranges.clearAndFree(allocator);

    // Add ranges to array
    for (0..part_count) |part| {
        const start = part * part_size;
        // INFO: Remember the "end" index of a http request rangesis inclusive,
        // while "end" index of byte/string slice is exclusive
        const end = if (part == part_count - 1)
            start + last_part_size - 1
        else
            (part + 1) * part_size - 1;
        try ranges.append(allocator, .{ .start = start, .end = end });
    }

    for (ranges.items) |r| {
        std.log.info("Range {d}-{d}: {s}", .{ r.start, r.end, content[r.start .. r.end + 1] }); // add one to end index since byte slices are exclusive

    }

    const start_time = std.Io.Timestamp.now(io, .awake);

    var local_file = try std.Io.Dir.createFile(.cwd(), io, "local_file", .{});

    const max_concurrency_count = 3;
    var semaphore: Io.Semaphore = .{ .permits = max_concurrency_count };

    var futures: std.ArrayList(Io.Future(anyerror!void)) = .empty;
    defer futures.clearAndFree(allocator);

    // TODO: Maybe a different implementation with Io.Group, Io.Queue or Io.Select
    // var futures: std.Io.Queue(Io.Future(!void)) = .init(&.{});

    for (ranges.items, 0..) |r, i| {
        std.log.debug("Start putting range {d} to futures", .{i});
        const chunk = content[r.start .. r.end + 1];
        try futures.append(allocator, io.async(downloadAndWrite, .{ io, &semaphore, &local_file, chunk, r.start }));
        semaphore.post(io);
        std.log.debug("Finished putting range {d} to futures, semaphore increased", .{i});
        // try futures.putOne(io, io.async(downloadAndWrite, .{ io, &semaphore, &local_file, chunk, r.start }));
    }

    for (futures.items) |*func| {
        try func.await(io);
        const now = Io.Timestamp.untilNow(start_time, io, .awake);
        std.log.debug("Completed part {d}ms after start", .{now.toMilliseconds()});
    }

    var buf: [1024]u8 = undefined;
    var stdout = std.Io.File.Writer.init(.stdout(), io, &buf);
    const stdout_writer = &stdout.interface;

    try stdout_writer.print("Finished\n", .{});
}

// Fake a download, waiting 0.5s before returning the chunk.
// Then writing the chunk to the newly created file at the correct position.
fn downloadAndWrite(io: Io, sema: *Io.Semaphore, file: *std.Io.File, chunk: []const u8, start: u64) anyerror!void {
    try sema.wait(io);
    try io.sleep(std.Io.Duration.fromMilliseconds(2000), .awake);
    try file.writePositionalAll(io, chunk, start);
}

Output (where I would await more 2 seconds steps):

debug: Completed part 2000ms after start
debug: Completed part 2000ms after start
debug: Completed part 2000ms after start
debug: Completed part 2000ms after start
debug: Completed part 2000ms after start
debug: Completed part 2000ms after start
debug: Completed part 2001ms after start
debug: Completed part 2001ms after start
debug: Completed part 2001ms after start
debug: Completed part 2001ms after start

I already thought using either std.Io.Group, std.Io.Select or std.Io.Queue for this job might be better, but documentation on those structs is rare. Most stuff online uses something like std.Thread.Pool which seems deprecated in master.

Thus, I’m thankful for any idea/hint, whatever! And sorry for the long text and if I’m missing something obvious. Zig is still new to me, as is this low level async/concurrent stuff (I know “async is not concurrent” :wink: )

Thank you

Your problem is here. The semaphore.post(io) should be in downloadAndWrite, not in the for loop.
What is happening is that you create the futures and then immediately increment the semaphore, making more permits. Then when you actually run the futures, the premits will be N + 3 and so all of them are able to get a permit and run.

I pulled your code and moved it to downloadAndWrite and it only did it in batches of 3:

// Fake a download, waiting 0.5s before returning the chunk.
// Then writing the chunk to the newly created file at the correct position.
fn downloadAndWrite(io: Io, sema: *Io.Semaphore, file: *std.Io.File, chunk: []const u8, start: u64) anyerror!void {
    try sema.wait(io);
    // free up the permit on exit.
    defer sema.post(io);
    try io.sleep(std.Io.Duration.fromMilliseconds(2000), .awake);
    try file.writePositionalAll(io, chunk, start);
}
3 Likes

Thanks for the fast and detailed reply. Now that you explained it that totally makes sense. As I don’t have the opportunity right now, I’ll try it tomorrow and report!

However, just fueled by interest and the willing to learn: Are there more idiomatic/efficient ways to achieve this using some of the tools mentioned above (Group, Select, Queue)? Since I couldn’t find many real usage examples, if any, would be happy for some insights/ideas.

Idiomatic will come with time. I am not proficient with the new Io stuff (my code is mainly stuck on 0.15.2), so someone more knowledgable will be able to provide more information on the efficiency/idomatic part.
I can explain the different parts and how they interact with a few ideas on how to accomplish your situation.

Group

A group is a collection of futures that can be canceled or awaited together. So in your example, instead of sticking the futures in an ArrayList, you could instead Group.async them and then later just run Group.await to wait for all of them to finish.

Select

Select is similar to a group but with one exception. Instead of waiting for all of the futures to finish, it takes the result from the first one that finishes and cancels the rest. It will return that result to the caller of Select.await.

Queue

A queue is like a Go channel or a Rust mpsc/mpmc channel. You can plug work into the queue and have a worker or workers on the other end pulling data off and running it. By itself this will not limit the number of workers or how many things are done in parallel. It’s just a container for the work to be done.

Brainstorming

Now to your overall question. Using a semaphore and spawning all of the tasks into a group would work. This will allow only a certain number of tasks to run at once with the convenience of a single Group.await at the end to make sure they are all done.

Another option would be to push all the work into a queue. You then spawn X workers who pull off the queue, do the processing, and then come back to the queue. They will block once the queue is empty. This limits the number of things in parallel.

Performance in either case will largely depend on how each task is implemented. If you have multiple calls to async inside a task, then you may have idle time while the IO happends but the Semaphore is locked or nothing is able to pull off the queue because all workers are blocked.

I think a more optimal solution would be to set up your Io implementation how you want, and spawn all tasks into a group. Don’t worry about how much is running in parallel as that will be handled by the Io implementation. Then when any of them are waiting for a future to finish, the implementation can go work on the next one. At the end, await the group.

Conclusion

A lot of this is talking in the abstract. The actual solution will depend on your problem and the actual constraints you want to make on the system. For example, you said:

In order to pick the best solution, one would need to know why you need a max count, what that count is, etc. Those constraints will inform how you compose the different Io tools to accomplish it.

2 Likes