Thread Barriers

Does Zig (either the language or the standard library) have features for thread synchronization/coordination? In C, I can simply use pthread barriers (pthread_barrier_t) to make each thread wait until all threads have reached the barrier. The main thread initializes the barrier with pthread_barrier_init(), then each thread only needs to call pthread_barrier_wait(). However, I can’t find any such functionality in the Zig standard library, so I’m wondering if I will have to implement it from scratch.

If I do implement thread barriers on my own, I assume I will have to use std.Thread.Mutex and std.Thread.Condition. A variable, protected by the Mutex, keeps track of the number of threads that are waiting at the barrier. Each time a thread enters the barrier, it increments that variable by 1. If the number of threads at the barrier is not equal to the total number of threads, the thread waits for the Condition. Otherwise, it sets the number of threads waiting at the barrier to zero (reset) and broadcasts the Condition, allowing all threads to continue.

Please correct me if that draft of an implementation is flawed or if you know of a much better way.

(BTW, I feel like there should be a concurrency, parallelism or multithreading tag. I can’t figure out how to add a new tag, so I assume that I’m not allowed to do that.)

“thread join” is the term you’re looking for, when searching for info on the subject.

Have a look at std.Thread.WaitGroup and std.Thread.ResetEvent, they should be the necessary building blocks, though mutex+condition is also a possible route, depending on the situation.

@pachde join is for waiting for threads to complete.

1 Like

The documentation for std.Thread.WaitGroup is lacking explanations. Do I simply create a WaitGroup object and call reset() on it once, then call wait() in each thread? Can this be used over many loop iterations, or do I need to somehow reset the WaitGroup after every iteration?

Yeah Zig is under heavy development and a lot of things will lack documentation for the time being.

One thing I recommend is having the Zig source code locally, so you can quickly navigate to relevant tests and implementations. In general, the std lib is very readable.

For example, if you need a simple barrier, maybe the approach in the broadcast test is all you need? As for WaitGroup, the implementation is very short.

Pool.zig and hasher.zig are other usage examples in the Zig source.

If WaitGroup and simple ResetEvent barriers isn’t enough for your use case, there are also many examples of Thread.Condition/Mutex in the Zig code base.

Welcome to the forum, btw!

3 Likes

I ended up implementing the following code:

File: Barrier.zig

//! A thread barrier is a mechanism for thread synchronization.
//! Blocks threads until a certain number of threads has reached the barrier,
//! after which all threads are unblocked and the barrier is reset.
//!
//! Based on http://byronlai.com/jekyll/update/2015/12/26/barrier.html

const std = @import("std");
const Self = @This();

mutex: std.Thread.Mutex,
cond: std.Thread.Condition,
required_thread_count: usize,
current_thread_count: usize,
cycle: usize,

pub fn init(
    /// Number of threads that must enter the barrier before it is lifted.
    thread_count: usize,
) Self {
    return .{
        .mutex = .{},
        .cond = .{},
        .required_thread_count = thread_count,
        .current_thread_count = 0,
        .cycle = 0,
    };
}

/// Wait at the barrier. If this is the final thread, lift the barrier instead.
pub fn wait(self: *Self) void {
    self.mutex.lock();
    defer self.mutex.unlock();
    self.current_thread_count += 1;
    if (self.current_thread_count == self.required_thread_count) {
        // All threads have arrived at the barrier.
        self.cycle += 1;
        self.current_thread_count = 0;
        self.cond.broadcast();
    } else {
        // Wait for other threads to arrive...
        const my_cycle: usize = self.cycle;
        // Threads may sporadically wake up. This loop prevents them from leaving the barrier too early.
        while (self.cycle == my_cycle)
            self.cond.wait(&self.mutex);
    }
}

/// Pass through the barrier without waiting. If this is the final thread, also lift the barrier.
pub fn passthrough(self: *Self) void {
    self.mutex.lock();
    defer self.mutex.unlock();
    self.current_thread_count += 1;
    if (self.current_thread_count == self.required_thread_count) {
        // All threads have arrived at the barrier.
        self.cycle += 1;
        self.current_thread_count = 0;
        self.cond.broadcast();
    }
}

3 Likes

may be it worth to add timeout to wait?

1 Like

I would have thought, that because the whole function wait()is guarded by a Mutex, the first thread to enter it would have blocked any other thread, causing a dead lock. What am I missing here?

release mutex+wait on cond atomically.

still timeout is welcome, i don’t like to stuck

1 Like

I’m curious, have you tried implementing this with a Futex and some atomics for the aggregation?

Ah thanks! … too obvious

no straightforward

not at all, I spent ages on it too :joy:

Nice and concise, good code.
Some thoughts:

  • Consider using std.Io.Mutex and the corresponding Condition, instead of the normal std.Thread.Mutex, in order to prepare for the upcoming changes. You’d then take an Io as a parameter.
  • Adding a timeout, like others mentioned , is also a good idea.
  • Consider making a PR to the std library. I needed this a while ago, and ended up using platform specific code.
2 Likes

I needed a thread barrier implementation for my bachelor thesis, which I finished a few months ago now. In other words, I don’t need to prepare for any future changes, but this info is good for people who may copy my code in the future.

A timeout mechanism was also not necessary, since I needed it for deterministic, parallel numerical computations. There was no chance that a thread would just get stuck forever, and if it did, the whole computation would be ruined anyway.

If somebody wants to make a PR for a thread barrier, I would suggest having separate “wait()” and “waitWithTimeout()” functions.

1 Like