Hi everyone,
I’m currently developing an append-only, structured stream database/storage. It is able to support multiple streams, where each of the streams has one append-only writer and functionally unlimited concurrent seekable readers.
Internally I have a BufferManager that keeps a freely accessible pool of 1MiB slots. The writer appends directly to one of these slots, and when it is full, it is compressed, flushed to disk, and the writer rotates to a new one. Any data a writer has written is regarded as final and cannot be changed. Concurrent readers are strictly coordinated in a pipeline behind the writer, meaning they can read both flushed historical blocks and the active uncompressed block of the writer of their stream.
To achieve maximum throughput and avoid memory copies, my goal is to map the Io.Reader.buffer directly to the pinned 1MiB slots in the cache pool. This allows users to avoid redundant copies.
I’ve already written the writer. The only roadblock is the reader due to the semantics of rebase and RebaseError.
The problem with rebase for fixed chunks
The interface expects that if a caller wants a contiguous view (peek and friends), the implementation can expand the available contiguous view via rebase:
/// Ensures `capacity` data can be buffered without rebasing.
///
/// Asserts `capacity` is within buffer capacity, or that the stream ends
/// within `capacity` bytes.
///
/// Only called when `capacity` cannot be satisfied by unused capacity of
/// `buffer`.
///
/// The default implementation moves buffered data to the start of
/// `buffer`, setting `seek` to zero, and cannot fail.
rebase: *const fn (r: *Reader, capacity: usize) RebaseError!void = defaultRebase,
In particular the second sentence is problematic. The problem in this case is at the block boundary. This is because we can’t shift inside the cache because the data is semantically immutable and because the error set allowed to return from rebase is just error{ ReadFailed, EndOfStream}. If I return EndOfStream, I falsely tell the caller the stream is finished; if I return ReadFailed, I crash the pipeline with a generic read failure.
The problem is mostly, that StreamTooLong isn’t in the set as this comment in peekDelimiterInclusive also acknowledges:
// It might or might not be end of stream. There is no more buffer space
// left to disambiguate. If `StreamTooLong` was added to `RebaseError` then
// this logic could be replaced by removing the exit condition from the
// above while loop. That error code would represent when `buffer` capacity
// is too small for an operation, replacing the current use of asserts.
Rebasing just doesn’t really make sense in this case, which is okay and maybe I’m trying to hard to press this into this interface. Though I would like to be able to provide a better error than ReadFailed.
Possible Solutions
I see a few ways to solve this for my usecase:
- Add a buffer for all intermediate reads. Obviously bad because of unnecessary double copies and additional unnecessary virtual function calls to fill the buffer.
- Add a buffer for reads crossing chunk boundaries. This avoids double copying for most things but increases the implementation complexity by a lot and also requires a buffer of 1MiB[1] for each Reader because of the semantics of the capacity parameter and it’s interplay with the buffer size for
rebase. - Just throw a
@compileErrorto disallow everything that uses rebase. This would mean that, as far as I can see, I can’t use any of thepeekfunctions even though they are fine to call in the vast majority of the cases. It fortunately would still allow me to use nearly everything that copies things out of the buffer likereadStruct,readSlice,stream, and mostdelimiterfunctions, but this goes somewhat against the zero copy goal. - Wait for the
stdto add something likeStreamTooLongto rebase[2]. And then use thepeekfunctions and use the copying ones as a fallback.
I think the best thing would obviously be 4.
Has anybody run into similar problems and have I missed something that I could do or how I could resolve this? Or is the Io.Reader just the wrong thing for what I’m trying to achieve?
Thank you