How to wait all threads?

Hi, hope everyone has good time with Zig like myself.

I would like to ask a very basic question regarding threading.
The short code below just works fine for my study purpose, but I would like to wait before it prints “Bye” in an idiomatic way - not calling the join twice as well as without the defer.

const std = @import("std");

pub fn main() !void {
    std.debug.print("Hello.\n", .{});

    const thread_config = std.Thread.SpawnConfig{};

    const handle =
        try std.Thread.spawn(thread_config, thread_function, .{0});
    const handle2 =
        try std.Thread.spawn(thread_config, thread_function2, .{});

    defer handle.join();
    defer handle2.join();

    std.debug.print("Bye.\n", .{});
}

fn thread_function(_: anytype) !void {
    std.time.sleep(2 * std.time.ns_per_s);
    std.debug.print("thread: {s}.\n", .{"hi"});
}

fn thread_function2() !void {
    std.time.sleep(1 * std.time.ns_per_s);
    std.debug.print("thread2: {s}.\n", .{"hi"});
}

Maybe I can create a slice to hold the threads, and iterate over the slice, then call the join on a captured variable in the loop? What would you do?

Since I mostly learned/used high level languages like Go, I would like to learn how to handle some concurrency patterns. Not thread pool yet, but some baby steps first.

I don’t see anything wrong or “non-idiomatic” with your code.
Windows has WaitForMultipleObjects which allows joining for various threads at once, or wait for at least one to complete, but I don’t think Zig’s std exposes that.

1 Like

ThreadPool source is a nice example of allocating and joining multiple threads.
Here are a few highlights:

threads: []std.Thread,

pub fn init(pool: *Pool, options: Options) !void {
...
    var spawned: usize = 0;
    errdefer pool.join(spawned);

    for (pool.threads) |*thread| {
        thread.* = try std.Thread.spawn(.{}, worker, .{pool});
        spawned += 1;
    }
}

pub fn deinit(pool: *Pool) void {
    pool.join(pool.threads.len); // kill and join all threads.
    pool.* = undefined;
}

fn join(pool: *Pool, spawned: usize) void {
...
    for (pool.threads[0..spawned]) |thread| {
        thread.join();
    }
3 Likes

Yes, I do this all the time when dealing with multiple threads.

var threads: [2]std.Thread = undefined;
threads[0] = try std.Thread.spawn(.{}, thread_function, .{0});
errdefer threads[0].join();
threads[1] = try std.Thread.spawn(.{}, thread_function2, .{});
defer for (&threads) |t| t.join();
4 Likes

The thread pool example seems like the one that I should learn little more.
The short array example seems like the one that fits what I am looking for right now.

Thanks for all good and quick answers!

1 Like

Also I ended up writing something like this:
The one with the solution I picked.
Another one with the allocator (I am sure this is not the best way, but I am learning!).

const std = @import("std");

pub fn main() !void {
    std.debug.print("Hello.\n", .{});

    const thread_config = std.Thread.SpawnConfig{};

    // =====================

    var threads: [2]std.Thread = undefined;
    threads[0] = try std.Thread.spawn(thread_config, thread_function, .{"t0"});
    threads[1] = try std.Thread.spawn(thread_config, thread_function, .{"t1"});
    errdefer threads[0].join();
    for (&threads) |t| t.join();

    // =====================

    const allocator = std.heap.page_allocator;
    const VecThread = std.ArrayList(std.Thread);
    var list = VecThread.init(allocator);
    defer list.deinit();

    const thread2 = try std.Thread.spawn(thread_config, thread_function, .{"t2"});
    const thread3 = try std.Thread.spawn(thread_config, thread_function, .{"t3"});
    try list.append(thread2);
    try list.append(thread3);
    const threadSlice = try list.toOwnedSlice();
    for (threadSlice) |t| {
        t.join();
    }

    std.debug.print("Bye.\n", .{});
}

I think you meant:

errdefer for (&threads) |t| t.join();

I don’t see why you’d be joining threads[0] but not threads[1]. Also, if an error occurs, you’ll get a double join on threads[0], the one in the loop and the other in the errdefer.

1 Like

That errdefer doesn’t make sense, it needs to be right after assigning to threads[0] so that the zero indexed thread gets joined in case the one indexed attempt to spawn a thread fails.

If it is below the assign to the one indexed thread you already know that the spawn was successful and the for loop takes care of both. The errdefer is so that you don’t forget joining the zero indexed one.
Don’t just reorder statements because it “looks cleaner”.

Also that whole block should be in its own scope so that the errdefer stops being active when the threads slice was successfully initialized. (you don’t want const thread2 = try ... to fail an trigger the errdefer way above because that would call join again.)


Also with thread2 and thread3 you aren’t handling error at all, just exiting the program without joining any potentially still fine threads.

Basically you aren’t handling failure well, you should study the nuance of the code in @dimdin’s and @dude_the_builder’s answers they are carefully written to handle failure at any point.

If you are unsure about error handling, force the error in different spots and see what happens, for example by just calling a failing function instead:

fn fail(comptime R: type) !R {
    return error.OutOfMemory;
}
pub fn main() !void {
    ...
    _ = try fail(std.Thread);
}

There is also a tool to do this in a strategic manner for memory allocations checkAllAllocationFailures.

2 Likes

@LucasSantos91
@Sze

Thanks for correcting my mistake as well as detailed advises.
Yes, I blindly copied the answer (and happy it is running!), but you are right - I should understand what is the meaning for each line the order, especially in this low level programming environment.
Hope I can have better eyes when I bring another question.

1 Like

Maybe I am misunderstanding the question, but wouldn’t

pub fn main() !void {
    std.debug.print("Hello.\n", .{});

    const thread_config = std.Thread.SpawnConfig{};

    {
        const handle =
            try std.Thread.spawn(thread_config, thread_function, .{0});
        defer handle.join();

        const handle2 =
            try std.Thread.spawn(thread_config, thread_function2, .{});
        defer handle2.join();
    }

    std.debug.print("Bye.\n", .{});
}

do what you want here?

Both threads are joined before Bye is printed, every thread is joined if there’s an error, and there’s no code duplication between normal and error-handling code paths.

8 Likes

@matklad wins. Flawless victory.” :smile_cat:

3 Likes