Implementing a zero-copy block cache reader with current `rebase` semantics

Hi everyone,

I’m currently developing an append-only, structured stream database/storage. It is able to support multiple streams, where each of the streams has one append-only writer and functionally unlimited concurrent seekable readers.

Internally I have a BufferManager that keeps a freely accessible pool of 1MiB slots. The writer appends directly to one of these slots, and when it is full, it is compressed, flushed to disk, and the writer rotates to a new one. Any data a writer has written is regarded as final and cannot be changed. Concurrent readers are strictly coordinated in a pipeline behind the writer, meaning they can read both flushed historical blocks and the active uncompressed block of the writer of their stream.

To achieve maximum throughput and avoid memory copies, my goal is to map the Io.Reader.buffer directly to the pinned 1MiB slots in the cache pool. This allows users to avoid redundant copies.

I’ve already written the writer. The only roadblock is the reader due to the semantics of rebase and RebaseError.

The problem with rebase for fixed chunks

The interface expects that if a caller wants a contiguous view (peek and friends), the implementation can expand the available contiguous view via rebase:

    /// Ensures `capacity` data can be buffered without rebasing.
    ///
    /// Asserts `capacity` is within buffer capacity, or that the stream ends
    /// within `capacity` bytes.
    ///
    /// Only called when `capacity` cannot be satisfied by unused capacity of
    /// `buffer`.
    ///
    /// The default implementation moves buffered data to the start of
    /// `buffer`, setting `seek` to zero, and cannot fail.
    rebase: *const fn (r: *Reader, capacity: usize) RebaseError!void = defaultRebase,

In particular the second sentence is problematic. The problem in this case is at the block boundary. This is because we can’t shift inside the cache because the data is semantically immutable and because the error set allowed to return from rebase is just error{ ReadFailed, EndOfStream}. If I return EndOfStream, I falsely tell the caller the stream is finished; if I return ReadFailed, I crash the pipeline with a generic read failure.

The problem is mostly, that StreamTooLong isn’t in the set as this comment in peekDelimiterInclusive also acknowledges:

    // It might or might not be end of stream. There is no more buffer space
    // left to disambiguate. If `StreamTooLong` was added to `RebaseError` then
    // this logic could be replaced by removing the exit condition from the
    // above while loop. That error code would represent when `buffer` capacity
    // is too small for an operation, replacing the current use of asserts.

Rebasing just doesn’t really make sense in this case, which is okay and maybe I’m trying to hard to press this into this interface. Though I would like to be able to provide a better error than ReadFailed.

Possible Solutions

I see a few ways to solve this for my usecase:

  1. Add a buffer for all intermediate reads. Obviously bad because of unnecessary double copies and additional unnecessary virtual function calls to fill the buffer.
  2. Add a buffer for reads crossing chunk boundaries. This avoids double copying for most things but increases the implementation complexity by a lot and also requires a buffer of 1MiB[1] for each Reader because of the semantics of the capacity parameter and it’s interplay with the buffer size for rebase.
  3. Just throw a @compileError to disallow everything that uses rebase. This would mean that, as far as I can see, I can’t use any of the peek functions even though they are fine to call in the vast majority of the cases. It fortunately would still allow me to use nearly everything that copies things out of the buffer like readStruct, readSlice, stream, and most delimiter functions, but this goes somewhat against the zero copy goal.
  4. Wait for the std to add something like StreamTooLong to rebase[2]. And then use the peek functions and use the copying ones as a fallback.

I think the best thing would obviously be 4.


Has anybody run into similar problems and have I missed something that I could do or how I could resolve this? Or is the Io.Reader just the wrong thing for what I’m trying to achieve?

Thank you


  1. cache block size ↩︎

  2. or just change it in my local copy ↩︎

1 Like

perhaps it makes sense for std to support the concept of StreamTooLong for rebase, I would make an issue for it, if there isn’t one already.

Regardless, with ReadFailed, it indicates an implementation specific error, you can store additional information in your implementation state that the caller could access, see std.Io.File.Reader as an example.

Try doing that and seeing how often it is encountered, it may not be worth worrying about.

It may also be worth providing multiple implementations with differing behaviour for different use cases.

1 Like

I’ve searched a bit but didn’t find any. But also didn’t just want to open one willy nilly if I was just thinking about it in a wrong way.

Yeah that’s true and I’ll likely do this for now, though I find it a bit weak? in this case.

I’m currently focusing on these types of issues with the Reader/Writer interface.

The relevant issues here sound like:

Just to be sure, you’re talking about using Readers with zero-length buffers?

EDIT: I think I misunderstood. I might need a small pseudo-code example to get on the same page.

1 Like

If you are willing to use page mapping techniques this might be another possibility:

2.b map your read only chunks so that they map to physical pages and then use virtual page mapping to make the pages of two chunks appear as continuous.

2 Likes

Now that I think about it, I had a similar case for the rebase in the Io.Writer. There I just used failingRebase. Though there this isn’t that much of a problem, as rebase isn’t used as much, at least for me, and also because there isn’t another error there, that fit’s the semantics I’m trying to achieve.

I don’t think the issues capture what I’m trying to do, though second one is close and maybe even a general case of what I’m trying to do. This is just how I read it, but I imagine that there needs to be some kind of way for the concrete interface implementation to communicate to the caller that it needs to need some amount to be kept in the buffer to keep its semantics in tact. For the decompression this would be the window, for me this would be one entire chunk.

I think StreamTooLong or an integer like suggested in 1 in the issue might be the right call. I think the main thing about this, that needs to be relaxed is this all-or-nothingness.

No. I’m instantiating it with something like this:

pub fn init(stream: *Stream) Reader {
    const block = stream.buffer_manager.reserveBlock();
    const buf = stream.buffer_manager.getBlockData(block);
    return .{
        .stream = stream,
        .cached_slot = block,
        .logical_pos = 0,
        .interface = .{
            .vtable = &.{
                .stream = stream,
                .discard = discard,
                .readVec = readVec,
                .rebase = rebase,
            },
            .buffer = buf,
        },
    };
}

Here buf points directly into the slots of the buffer manager used for caching.

That’s a good idea but it will likely not work because the chunks can be read while they’re written to. And chunks of different streams are written out of order, so the cache is also unordered. And ordering the cache by stream is nigh impossible.

There are a few other options in the same vein that I’ve already discarded.
One would be to write each stream to a separate file and concatenate them afterwards. But this is also bad because it increases disk and file descriptor usage by a lot. I expect there to be >1k different streams on a regular and sometimes an order of magnitude more.
Another would be to take advantage of sparse files. But those are very filesystem dependent and also suffer from there being a lot of streams with vastly different sizes.
Another would be to forego the shared cache and have stream local buffers. But this is again bad because of the amount of streams and the likely need to then tune the buffer sizes of each of the streams. In my case I can just use clock eviction and be done with it.

2 Likes