Algorithm Translation Challenge 2: Partition

Welcome to challenge two (and happy new years everyone)!

Today, our target is partition - an algorithm that has a surprising number of uses. Partitions are useful concepts in algorithms such as quicksort but also have a place in databases and divide and conquer processing strategies.

Given some predicate function, the partition algorithm will place all elements that satisfy the predicate to the front of the list and return the index to the first element that doesn’t satisfy the predicate (we’ll call that index last). That way, slice[0..last] will return a sub range where each element satisfies the predicate.

It will be up to you to decide how to report any errors. For instance, if the range is empty, you could return a value (as you’ll see in the example code) or perhaps you prefer to return null and use optionals instead.

Note:

Takes an input range as an argument
Takes a predicate function as an argument
Returns a handle to the first element where Predicate(x) -> false
Output range does not need to be sorted.

Example:

input = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }

Predicate: p(x) -> (x % 2) == 0

last = partition(input, p); // modifies "input" range

input[0..last] -> { 0, 2, 4, 6, 8 } // all even numbers.

Possible Implementation from: https://en.cppreference.com/w/cpp/algorithm/partition

template<class ForwardIt, class UnaryPredicate>
ForwardIt partition(ForwardIt first, ForwardIt last, UnaryPredicate p)
{
    first = std::find_if_not(first, last, p);
    if (first == last)
        return first;
 
    for (auto i = std::next(first); i != last; ++i)
        if (p(*i))
        {
            std::iter_swap(i, first);
            ++first;
        }
 
    return first;
}

Bonus:

Write an iterator that mimics the partition functionality. The iterator has a
"next" function that returns the next value that satisfies the predicate or `null`
if it reaches the end of the input range.
3 Likes

Here’'s both versions of partition. Both versions are safe for zero-length arrays as array[0..N] where N == 0 returns an empty slice.

Function version:

inline fn swap(x: anytype, y: anytype) void {
    const u = x.*;
    x.* = y.*;
    y.* = u;
}

pub fn partition(
        comptime T: type,
        slice: []T,
        comptime predicate: fn(T) bool
    ) usize {

    // find the head of the second partition
    var head: usize = 0;

    while (head < slice.len and predicate(slice[head])) : (head += 1) {
        continue;    
    }

    // advance the tail of the second partition to the end
    var tail: usize = head + 1;
    
    // swap qualifying elements to first partition
    while (tail < slice.len) : (tail += 1) {
        if (predicate(slice[tail])) {
            swap(&slice[head], &slice[tail]);
            head += 1;
        }
    }
    
    return head;
}

fn isEven(x: usize) bool {
    return (x & 1) == 0;
}

pub fn main() !void {

    var array: [10]usize = undefined;
    
    for (&array, 0..) |*elem, i| {
        elem.* = i;
    }

    const N: usize = partition(usize, &array, isEven);
    
    std.debug.print("\n{any}\n", .{ array[0..N] });
}

Iterator Version:

pub fn PartitionIterator(
    comptime T: type,
    comptime predicate: fn(T) bool
) type {

    return struct {

        slice: []const T,
        index: usize,

        pub fn init(slice: []const T) @This() {
            return .{
                .slice = slice,
                .index = 0,
            };
        }

        pub fn next(self: *@This()) ?T {

            // advance to find the next qualifying element
            while (self.index < self.slice.len and !predicate(self.slice[self.index])) {
                self.index += 1;
            }

            // return qualifying element if in range
            if (self.index < self.slice.len) {
                const tmp = self.slice[self.index];
                self.index += 1;
                return tmp;
            }

            return null;
        }
    };
}

fn isEven(x: usize) bool {
    return (x & 1) == 0;
}

pub fn main() !void {

    var array: [10]usize = undefined;
    
    for (&array, 0..) |*elem, i| {
        elem.* = i;
    }

    var iter = PartitionIterator(usize, isEven).init(&array);

    while (iter.next()) |elem| {
        std.debug.print("\n{}\n", .{ elem });
    }
}
1 Like

I like this variant:

inline fn swap(x: anytype, y: anytype) void {
    const u = x.*;
    x.* = y.*;
    y.* = u;
}

pub fn partition(
    comptime T: type,
    slice: []T,
    comptime predicate: fn (T) bool,
) usize {
    // Invariants:
    //
    //  - predicate(x) == true for all x in slice[0..i]
    //  - predicate(x) == false for all x in slice[j..]
    //
    // slice[i..j] contains unclassified elements; the goal is to shrink the
    // range until i == j.
    var i: usize = 0;
    var j: usize = slice.len;
    while (true) {
        while (i < j and predicate(slice[i])) i += 1;
        while (i < j and !predicate(slice[j - 1])) j -= 1;
        if (i >= j) return i;
        swap(&slice[i], &slice[j - 1]);
        i += 1;
        j -= 1;
    }
}

This algorithm has the benefit that it minimizes the number of swaps. Your version does around twice as many swaps as needed in the average case, though the worst case is the same for both algorithms: slice.len / 2. The difference comes from the fact that you swap once for each tail-element that is out of place, but I only swap when elements on both sides are out of place, so I always fix 2 errors per 1 swap, which is optimal.

This version makes a call to partition() which returns a PartitionIterator. The iterator has a sort() member function which rearranges the input. I tried to match the C++ implementation in a broad sense, while also taking full advantage of the Zig way of doing things.

const std = @import("std");

const Error = error {
   EmptyInput,
};

pub fn validate(comptime T: type, buffer: []T) !void {
   if (buffer.len == 0) {
      return Error.EmptyInput;
   }
}

fn even(comptime T: type, val: T) bool {
   return (val & 1) == 0;
}

pub fn PartitionIterator(comptime T: type, comptime p: fn(type, T) bool) type {
   return struct {
      input: []T,
      first: usize,

      const Self = @This();

      pub fn sort(self: *Self) void {
         for (self.input[self.first + 1..], self.first + 1..) |val, i| {
            if (p(T, val)) {
               std.mem.swap(T, &self.input[self.first], &self.input[i]);
               self.first += 1;
            }
         }
         return;
      }
   };
}

pub fn partition(comptime T: type, comptime p: fn(type, T) bool, input: []T) PartitionIterator(T, p) {
   var first: usize = undefined;
   for (input, 0..) |val, i| {
      if (!p(T, val)) {
         first = i;
         break;
      }
      else if (i == input.len - 1) {
         first = i;
      }
   }

   return .{
      .input = input,
      .first = first,
   };
}

pub fn main() !void {
   const writer = std.io.getStdOut().writer();

   const T = u8;

   var input = [_]T{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
   try validate(T, &input);

   var it = partition(T, even, &input);
   it.sort();

   try writer.print("> {d}\n", .{ input });
   try writer.print("> first = {d}\n", .{ it.first });
}

test {
   const T = u8;

   var input = [_]T{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
   try std.testing.expect(input.len > 0);

   var it = partition(T, even, &input);
   it.sort();

   try std.testing.expectEqual(input, [_]T{ 0, 2, 4, 6, 8, 5, 3, 7, 1, 9 });
   try std.testing.expectEqual(it.first, 5);
}

My strategy for putting this together was to to attempt everything within the context of the quicksort algorithm. However I ran into some challenges along the way. In the case of the C++ implementation, you have a partition algorithm which can separate evens and odds. Then you also have the quicksort algorithm, which calls the same partition function that you use for the simple remainder division. I was not able to translate this directly to Zig. It seems the main issue is that the declaration of PartitionIterator accepts a comptime-known predicate function which must specify the types of the parameters for the predicate function. In the case of a simple remainder predicate, only one value T needs to be compared to a constant (T & 1) == 0. When using the quicksort algorithm you will have to compare that T to another T which is the pivot value, so the predicate function will become a comptime p: fn(type, T, T) bool with the extra T for the pivot. Possibly you could have the remainder function accept a second T as well so you would have (T & T) == 0. However there are also some subtle nuances within the initialization and sorting functions, which depend on the pivot value. Ultimately I did not find a practical way to call a quicksort function which makes use of the PartitionIterator. Instead it seems that you would have to have an entirely separate SortingIterator with it’s own initialization and sorting functions.

1 Like