Need help for designing a concurrent datastructure

Hi, I’m working on some code that traverses an octree. For each box I need to aggregate some value (some KB of data for each near box).
Since these values are static, I’m computing them once (in parallel) and then aggregating them in a different array.
However this is very wasteful in terms of memory usage, because each value is only needed at most 26 times, and after it has been utilized, it’s memory is wasted.

My idea is to have a data structure that acts as a cache.
The basic design would have 2 functions: get, free

get:

//request the value of a box
fn get(cache, id) T {
    if already_computed 
        return a pointer to the value
    else
        initialize the counter for that box equal to the number of near boxes
        allocate, compute and return the value
    }

free:

//release the value after the aggregation
fn free(cache, id) {
    decrease the counter of the value used
    if the counter reaches zero 
        release the memory
}

To maximize the number of values that I need to store I’m already traversing the tree using z-ordering. Furthermore, instead of freeing the memory I was thinking to implement a free-list, to both decrease syscalls and increase cache utilization.

How would you go about it?
Many thanks in advance

1 Like

A free-list works best when the values you are allocating are all of the same size, or if their size has an upper bound that you are okay with rounding up to. I’m not quite sure if that is the case for you (you say values have “some KB of data for each near box”), but working under the assumption that “some KB” can be bounded to a reasonable fixed size, I have two approaches you could try:

  • You could represent a box’s value as a linked list containing a fixed-size chunk of data for each near box, and store those chunks in a std.heap.MemoryPool (which implements a free-list internally).

  • You could have 26 memory pools, each storing values for boxes with (1, 2, 3, …, 26) neighbors. This would allow the values to be stored contiguously instead of being in possibly far-apart chunks, and may require less refactoring of your other code as you can simply use your previous representation of a box’s value instead of a linked list.

If the box values have wildly varying sizes that can’t be reasonably rounded to some upper bound, you might just want to use a regular one-size-fits-all allocator like std.heap.SmpAllocator, as smart allocators will also retain freed memory for re-use when it is optimal to do so, and cases where allocations of unknown sizes are made and freed at unknown intervals are exactly what these allocators are designed for. Also, after briefly reading the source code of std.heap.SmpAllocator, it looks like it actually uses a similar freeing strategy to the 26 memory pool thing I mentioned, where it has free lists for multiple chunk sizes (the sizes being increasing powers of 2).

Thank for the reply. I’ll give you some more info. Each value has exactly the same size, so a free list should be best.
My current problem is how to use the synchronisation primitives to avoid races to the data structure

Suggestion way out of left field …

It’s worth having a read through the sections on “capability restrictions” in the pony language spec

It’s a reasonably radical way of encoding guarantees at the language syntax level, that enables safe concurrent read/write without mutexes, and a guarantee of no data races.

That’s not going to be of direct usage in zig, but it might give some good ideas about how to emulate what the pony compiler is doing to provide similar guarantees

Both zig and pony are c api / ffi friendly, so a hybrid approach is possible too

1 Like

Assuming get and free have to be safely callable from any thread on any box, I have two approaches you could try.

Simple approach (doesn’t compute multiple box values in parallel):

Make the cache hold 2 mutexes (std.Thread.Mutex), which I will refer to as get_mutex and memory_pool_mutex.

Wrap the entire contents of get with get_mutex as follows:

fn get(cache, id) T {
    cache.get_mutex.lock();
    defer cache.get_mutex.unlock();
    // rest of function
}

This will handle data races on the data structure that checks for a box’s cached value (calculating already_computed). It will also eliminate certain race conditions such as when two threads both see that a value isn’t in the cache at the same time, and both try to compute it from scratch.

Then, wrap all calls to functions on the memory pool (or whatever data structure manages your free-list and memory allocations) with memory_pool_mutex, or just make a wrapper struct for your memory pool that adds these locks to every function.

Finally, you need to ensure there are no data races on the counters, which you can do by simply storing each one as a std.atomic.Value.

Expanded approach (computes multiple box values in parallel):

The first approach locks the entire get function, meaning you can only calculate a single box’s value at once across all threads. I’m assuming that the computation of a box’s value is by far the most intensive part of get, so this isn’t ideal.

To get around this while still keeping things relatively simple, you could make retrieving a cached value not just give a pointer, but a pointer and a ‘latch’ that must be waited on until the pointed-to value is initialized. This way the get function could do the allocation of the box’s value and the marking of the box’s value as ‘computed’ in the locked section, then actually compute the value in an unlocked section:

fn get(cache, id) T {                          
   *lock mutex*                                 
   if already_computed                             (LOCKED)
       get latch and pointer to value              (LOCKED)
       *unlock mutex*                             
       wait on latch                               (UNLOCKED)
       return pointer to value                     (UNLOCKED)
   else                                           
       initialize the counter                      (LOCKED)
       allocate value                              (LOCKED)
       make latch for value                        (LOCKED)
       mark value as computed                      (LOCKED)
       *unlock mutex*                            
       compute value, storing in allocated spot    (UNLOCKED)
       open latch                                  (UNLOCKED)
       return pointer to value                     (UNLOCKED)
}                                             

Unfortunately zig stdlib doesn’t have a premade latch that I know of, but a simple latch like this should work in your case:

const Latch = struct {
    is_open: std.atomic.Value(u32),

    const init: Latch = .{
        .is_open = .init(0),
    };

    pub fn open(self: *Latch) void {
        self.is_open.store(1, .release);
        std.Thread.Futex.wake(&self.is_open, std.math.maxInt(u32));
    }

    pub fn wait(self: *Latch) void {
        while (self.is_open.load(.acquire) == 0) {
            std.Thread.Futex.wait(&self.is_open, 0);
        }
    }
};

You might be able to reduce blocking further and allow some of the smaller things like the already_computed checks to be done in parallel, but I generally think that it’s good to just try and keep the locking logic as simple as possible, focusing mostly on getting the ‘big chunks’ of computation to be more parallel/non-blocking, even if the heavy-handed/naive locking on smaller computations is ‘uglier’.

1 Like

Is your accumulation pattern completely tree like?

If it is then I wonder whether you could implement it similar to an iterative bottom up merge sort (a parallelized version of that), starting from the leaves calculating the function on the leaves then accumulating up the tree, interleaving the work on the leaves with accumulating inner nodes once their children were completed.

I think that would organize it in a way to basically always complete the dependencies right before reaching the dependent.

I assume what you describe as a box is that leaf of KB data?
What are the dependencies between boxes, do they only need syncronization once you arrive at the parent that merges them?
If they need more syncronization, why?

I am not sure whether what I am imagining resembles what you are trying to do.
I guess in a way I am asking what part adds gnarly dependencies that you haven’t mentioned yet, that make a super simple parallelization impossible.

Thank you very much. Yes the computation of the value of a single box takes around 100us, so I would like to do them in parallel.
I think now I get the overall structure, I’m going to experiment right away.

So for more context, I’m implementing a the fast multi-pole method to compute potentials.

The algorithm breaks down in these phases:

  1. sort particles in an octree
  2. compute box value of leaf
  3. aggregate into parent box
  4. aggregate info of near boxes, of the same level ← this is the step I’m trying to optimize
  5. spread info to children boxes
  6. compute potential

For step 2,3,5,6 I’m using spherical harmonic, but for step 4 I convert them to exponential harmonics, because they are much easier to move around (in physical space).

So the 4° step is further subdivided into:
4.1. convert value of a box to exponential
4.2. aggregate value of near boxes
4.3 convert back to spherical

Those steps can be parallelized trivially by doing each step sequentially, however since each exponential value is used only 26 times, it is wasteful to keep them around for the entire duration.

To give more info about memory consumption, with 1_000_000 particles the current implementation uses 1.4 GB, 1G of it is due to the exponential harmonics
Also the algorithm bottleneck is the memory bandwidth during the 4° step, so increasing cache usage of this step is going to greatly accelerate it.

1 Like

With this added context I have another idea for how you could effectively get the advantages of the cache without the tricky locking logic. Before I assumed that the boxes needed to be processable in any order, but from what you are saying it seems like you have full control over which boxes have their aggregation computed first. If this is the case, I recommend computing the aggregations of the boxes ‘slice by slice’ (one 2d plane of boxes at a time).

If you do your computation slice by slice, you will only ever need to store the exponential harmonics for 3 slices of boxes at a time. You can choose an axis (x,y,z) and move along it, keeping a 3 slice thick sliding window of exponential harmonics in memory. Each time you advance a slice, you shed the oldest slice of exponential harmonics (that no longer neighbor any un-computed boxes) and compute the next slice of exponential harmonics in a batch (which can be trivially parallelized, just like when you pre-computed all of them).

The axis you chose to move along may be important depending on the distribution of ‘empty’ boxes (those who don’t need to be considered in the aggregations). The most memory-efficient axis will be the one in which the maximum amount of non-empty boxes that can covered by the 3 slice window is the lowest.

The advantages of this approach over the cache:

  • The memory required to store the harmonics can be known in advance (by checking the maximum amount of non-empty boxes that can covered by the 3 slice window). This eliminates the need to do dynamic allocations during the aggregation process.
  • The parallel logic is much simpler, the only actions requiring synchronization are sending work chunks to the threads and receiving confirmation that the work is complete.
  • The in-memory distribution of the exponential harmonics is a lot simpler, with each slice of harmonics computed occupying 1 or 2 contiguous regions of memory (assuming you are using a circle buffer to store them). This gets rid of the overhead of traversing the free list to find open slots, and may possibly lead to more performant memory access patterns, with each thread writing to its own contiguous chunk for several harmonics in a row rather than disparate locations for each one.

Some implementation notes:

  • There are two main approaches you could take to storing the exponential harmonics: use a single circle buffer for all of them, or use a circle buffer for each thread. Using a single one is simpler, but there might be some performance benefits to having each thread write to the same memory each time (I’m honestly not sure, it’s worth testing/researching).
  • For aggregation, you’ll need to be able to map a given box to a given exponential harmonic. You can, of course, just use a hash map, but if that is a bottleneck you could also just use an array with a slot for each box in the 3 slices, empty or not, and in the indices corresponding to a non-empty box, store a pointer or index to its exponential harmonic.
  • I cover the computation of the exponential harmonics in parallel, but gloss over the computation of the aggregations. I’m assuming that you can use a similar batch processing strategy for them.
  • I recommend keeping all of the orchestration logic in one thread, and have the parallel threads only compute the big, trivially parallel stuff like exponential harmonics and aggregation.

That is a very promising approach, quite close to optimal.
I’ve not told every detail before, but there are a couple of details that can be further exploited.

Exponential harmonics are only valid for near interactions in one direction, so for each box there are 6 exponential harmonics: +z, -z, +y, -y, +x, -x. Since everything is linear, i can aggregate the values in witch ever order I want, so first I aggregate along x, then y, end last z. For each box I need to aggregate the 9 up boxes and the 9 bottom boxes.

Incorporating your idea with the slices, the aggregation becomes something like this.

for (x, y, z) |direction| {
  for (slices(direction)) |slice_ind| {
    up_slice = computeExponential(slice_ind + 1, -direction);
    down_slice = computeExponential(slice_ind - 1, +direction);
    aggregateExponential(up_slice, down_slice, slice_ind);
  }
}

You can see that with this approach I don’t share any exponential between the slices.
However your idea of using 3 buffer is still valid, but instead of going slice by slice, I go row by row, further reducing the memory consumption.

for (x, y, z) |direction| {
  for (slices(direction)) |slice_ind| {
    prev_row_up.zeros();
    prev_row_down.zeros();
    current_row_up = computeExponential(slice_ind + 1, start_row, -direction);
    current_row_down = computeExponential(slice_ind - 1, start_row, +direction);
    for(rows(slice_ind)) |row_ind| {
      //compute next row
      next_row_up = computeExponential(slice_ind + 1, row_ind + 1, -direction);
      next_row_down = computeExponential(slice_ind - 1, row_ind + 1, +direction);
      // accumulate
      aggregateExponential(row_ind, prev_row_up, prev_row_down);
      aggregateExponential(row_ind, current_row_up, current_row_down);
      aggregateExponential(row_ind, next_row_up, next_row_down);
      // rotate buffers
      prev_row_up = current_row_up;
      prev_row_down = current_row_down;
      current_row_up = next_row_up;
      current_row_down = next_row_down;
    }
  }
}

This requires 6 rows of data, witch can be preallocated and do not require complex orchestration.
Of course computeExponential and aggregateExponential are executed in parallel.

Yo, I got around implementing your proposal.

I would like to thank you for the help.
As a result my implementation uses x15 less memory that the original fortran implementation (coupled with other memory optimizations).
Also the speed is about 4 to 6 times faster.

In the profile below, I call the two implementation sequentially.
The difference is remarkable.

1 Like