Using thread pool in WebAssembly

I’m working on a code example involving the use of std.Thread.Pool. The web browser is one of the target platforms. It’s problematic there because synchronous wait is not allowed in the main thread. std.Thread.Pool.spawn() would simply crash the WASM VM when the pool is out of inactive threads.

The obvious solution is to make the call in another thread. But how?

2 Likes

Wasm doesn’t have any concept of threads. There is a WASI proposal for threads: GitHub - WebAssembly/wasi-threads. But I doubt any browsers are going to support it.
It looks like the Thread.Pool implementation has an option for a single-threaded environment (i.e. Wasm). So spawn will run the in the main thread and block until that function exits. It won’t crash the vm as far as I can tell.
If you really want multiple “threads”, you could have mutliple Module Instances and bridge them with JavaScript. But that will be more work.

2 Likes

Emscripten has support for pthreads (it implements it using shared memory and webworkers)

@permutationlock’s example shows how to use emscripten with threads:

This is based on @permutationlock’s usage of emscripten and a few other things, however I currently don’t have the pthread flags to emscripten activated in that project at the moment (because the game doesn’t really need it right now (it is commented out)).

I am also planning to update my zigraylib project to use the emscripten dependency of the raylib project, so that you don’t have to manually install emscripten (but got side tracked by another project).

3 Likes

I’m well aware of that. Threads require support from the JavaScript side. You basically create additional web worker instances and make them run the same WASM executable with the same shared memory. I’ve implemented all this stuff and am currently preparing for the next release of my project. That’s the purpose of the example, to show people how to make use of threads.

The main challenge here is the transfering of data from the main thread to the worker thread. This is tricky when you can’t protect data structures using mutexes. The following is my attempt at creating a non-blocking queue. It’s based on ideas from this article:

const std = @import("std");

fn Queue(comptime T: type) type {
    return struct {
        const Node = struct {
            next: *Node,
            payload: T,
        };
        const tail: *Node = @ptrFromInt(std.mem.alignBackward(usize, std.math.maxInt(usize), @alignOf(Node)));

        head: *Node = tail,
        allocator: std.mem.Allocator,
        count: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),
        stopped: bool = false,

        pub fn push(self: *@This(), value: T) !void {
            const new_node = try self.allocator.create(Node);
            new_node.* = .{ .next = tail, .payload = value };
            // link new node to the left of the tail
            self.attachOnLeft(new_node, tail);
            // increment count and wake up any awaking thread
            _ = self.count.fetchAdd(1, .release);
            std.Thread.Futex.wake(&self.count, std.math.maxInt(u32));
        }

        pub fn pull(self: *@This()) ?T {
            var current_node = self.head;
            const detached_node: ?*Node = while (current_node != tail) {
                const next_node = getUnmarkedReference(current_node.next);
                if (!isMarkedReference(current_node.next)) {
                    if (@cmpxchgWeak(*Node, &current_node.next, next_node, getMarkedReference(next_node), .seq_cst, .monotonic) == null) {
                        // remove current node from linked list by pointing the next pointer of the previous node to the next node
                        self.attachOnLeft(next_node, current_node);
                        break current_node;
                    }
                }
                current_node = next_node;
            } else null;
            var payload: ?T = null;
            if (detached_node) |n| {
                payload = n.payload;
                self.allocator.destroy(n);
                _ = self.count.fetchSub(1, .monotonic);
            }
            return payload;
        }

        pub fn wait(self: *@This()) void {
            std.Thread.Futex.wait(&self.count, 0);
        }

        pub fn deinit(self: *@This()) void {
            while (self.pull() != null) {}
            self.stopped = true;
            // wake up awaking threads and prevent them from sleep again
            self.count.store(std.math.maxInt(u32), .release);
            std.Thread.Futex.wake(&self.count, std.math.maxInt(u32));
        }

        fn attachOnLeft(self: *@This(), node: *Node, ref_node: *Node) void {
            while (true) {
                var next_ptr: **Node = undefined;
                var current_node = self.head;
                if (current_node == ref_node) {
                    next_ptr = &self.head;
                } else {
                    const left_node: *Node = while (current_node != tail) {
                        const next_node = getUnmarkedReference(current_node.next);
                        if (next_node == ref_node) break current_node;
                        current_node = next_node;
                    } else tail;
                    if (left_node == tail or isMarkedReference(left_node.next)) {
                        // try again
                        continue;
                    }
                    next_ptr = &left_node.next;
                }
                if (@cmpxchgWeak(*Node, next_ptr, ref_node, node, .seq_cst, .monotonic) == null) {
                    break;
                } else {
                    // try again
                }
            }
        }

        fn isMarkedReference(ptr: *Node) bool {
            return @intFromPtr(ptr) & 1 != 0;
        }

        fn getUnmarkedReference(ptr: *Node) *Node {
            return @ptrFromInt(@intFromPtr(ptr) & ~@as(usize, 1));
        }

        fn getMarkedReference(ptr: *Node) *Node {
            @setRuntimeSafety(false);
            return @ptrFromInt(@intFromPtr(ptr) | @as(usize, 1));
        }
    };
}

Another problem is with std.Thread.join(), which relies on wait as well:

    fn join(self: Impl) void {
        defer {
            // Create a copy of the allocator so we do not free the reference to the
            // original allocator while freeing the memory.
            var allocator = self.thread.allocator;
            allocator.free(self.thread.memory);
        }

        var spin: u8 = 10;
        while (true) {
            const tid = self.thread.tid.load(.seq_cst);
            if (tid == 0) {
                break;
            }

            if (spin > 0) {
                spin -= 1;
                std.atomic.spinLoopHint();
                continue;
            }

            const result = asm (
                \\ local.get %[ptr]
                \\ local.get %[expected]
                \\ i64.const -1 # infinite
                \\ memory.atomic.wait32 0
                \\ local.set %[ret]
                : [ret] "=r" (-> u32),
                : [ptr] "r" (&self.thread.tid.raw),
                  [expected] "r" (tid),
            );
            switch (result) {
                0 => continue, // ok
                1 => continue, // expected =! loaded
                2 => unreachable, // timeout (infinite)
                else => unreachable,
            }
        }
    }