Does Zig (either the language or the standard library) have features for thread synchronization/coordination? In C, I can simply use pthread barriers (pthread_barrier_t) to make each thread wait until all threads have reached the barrier. The main thread initializes the barrier with pthread_barrier_init(), then each thread only needs to call pthread_barrier_wait(). However, I can’t find any such functionality in the Zig standard library, so I’m wondering if I will have to implement it from scratch.
If I do implement thread barriers on my own, I assume I will have to use std.Thread.Mutex and std.Thread.Condition. A variable, protected by the Mutex, keeps track of the number of threads that are waiting at the barrier. Each time a thread enters the barrier, it increments that variable by 1. If the number of threads at the barrier is not equal to the total number of threads, the thread waits for the Condition. Otherwise, it sets the number of threads waiting at the barrier to zero (reset) and broadcasts the Condition, allowing all threads to continue.
Please correct me if that draft of an implementation is flawed or if you know of a much better way.
(BTW, I feel like there should be a concurrency, parallelism or multithreading tag. I can’t figure out how to add a new tag, so I assume that I’m not allowed to do that.)
Have a look at std.Thread.WaitGroup and std.Thread.ResetEvent, they should be the necessary building blocks, though mutex+condition is also a possible route, depending on the situation.
@pachde join is for waiting for threads to complete.
The documentation for std.Thread.WaitGroup is lacking explanations. Do I simply create a WaitGroup object and call reset() on it once, then call wait() in each thread? Can this be used over many loop iterations, or do I need to somehow reset the WaitGroup after every iteration?
Yeah Zig is under heavy development and a lot of things will lack documentation for the time being.
One thing I recommend is having the Zig source code locally, so you can quickly navigate to relevant tests and implementations. In general, the std lib is very readable.
For example, if you need a simple barrier, maybe the approach in the broadcast test is all you need? As for WaitGroup, the implementation is very short.
If WaitGroup and simple ResetEvent barriers isn’t enough for your use case, there are also many examples of Thread.Condition/Mutex in the Zig code base.
//! A thread barrier is a mechanism for thread synchronization.
//! Blocks threads until a certain number of threads has reached the barrier,
//! after which all threads are unblocked and the barrier is reset.
//!
//! Based on http://byronlai.com/jekyll/update/2015/12/26/barrier.html
const std = @import("std");
const Self = @This();
mutex: std.Thread.Mutex,
cond: std.Thread.Condition,
required_thread_count: usize,
current_thread_count: usize,
cycle: usize,
pub fn init(
/// Number of threads that must enter the barrier before it is lifted.
thread_count: usize,
) Self {
return .{
.mutex = .{},
.cond = .{},
.required_thread_count = thread_count,
.current_thread_count = 0,
.cycle = 0,
};
}
/// Wait at the barrier. If this is the final thread, lift the barrier instead.
pub fn wait(self: *Self) void {
self.mutex.lock();
defer self.mutex.unlock();
self.current_thread_count += 1;
if (self.current_thread_count == self.required_thread_count) {
// All threads have arrived at the barrier.
self.cycle += 1;
self.current_thread_count = 0;
self.cond.broadcast();
} else {
// Wait for other threads to arrive...
const my_cycle: usize = self.cycle;
// Threads may sporadically wake up. This loop prevents them from leaving the barrier too early.
while (self.cycle == my_cycle)
self.cond.wait(&self.mutex);
}
}
/// Pass through the barrier without waiting. If this is the final thread, also lift the barrier.
pub fn passthrough(self: *Self) void {
self.mutex.lock();
defer self.mutex.unlock();
self.current_thread_count += 1;
if (self.current_thread_count == self.required_thread_count) {
// All threads have arrived at the barrier.
self.cycle += 1;
self.current_thread_count = 0;
self.cond.broadcast();
}
}
I would have thought, that because the whole function wait()is guarded by a Mutex, the first thread to enter it would have blocked any other thread, causing a dead lock. What am I missing here?
Consider using std.Io.Mutex and the corresponding Condition, instead of the normal std.Thread.Mutex, in order to prepare for the upcoming changes. You’d then take an Io as a parameter.
Adding a timeout, like others mentioned , is also a good idea.
Consider making a PR to the std library. I needed this a while ago, and ended up using platform specific code.
I needed a thread barrier implementation for my bachelor thesis, which I finished a few months ago now. In other words, I don’t need to prepare for any future changes, but this info is good for people who may copy my code in the future.
A timeout mechanism was also not necessary, since I needed it for deterministic, parallel numerical computations. There was no chance that a thread would just get stuck forever, and if it did, the whole computation would be ruined anyway.
If somebody wants to make a PR for a thread barrier, I would suggest having separate “wait()” and “waitWithTimeout()” functions.