std.Io.Reader: add rebase to the vtable

This eliminates a footgun and special case handling with fixed buffers,
as well as allowing decompression streams to keep a window in the output
buffer.
This commit is contained in:
Andrew Kelley 2025-07-26 13:43:17 -07:00
parent de39c5f67f
commit 04614d6ea1
3 changed files with 63 additions and 42 deletions

View file

@ -757,7 +757,7 @@ pub fn Poller(comptime StreamEnum: type) type {
const unused = r.buffer[r.end..];
if (unused.len >= min_len) return unused;
}
if (r.seek > 0) r.rebase();
if (r.seek > 0) r.rebase(r.buffer.len) catch unreachable;
{
var list: std.ArrayListUnmanaged(u8) = .{
.items = r.buffer[0..r.end],

View file

@ -67,6 +67,18 @@ pub const VTable = struct {
///
/// This function is only called when `buffer` is empty.
discard: *const fn (r: *Reader, limit: Limit) Error!usize = defaultDiscard,
/// Ensures `capacity` more data can be buffered without rebasing.
///
/// Asserts `capacity` is within buffer capacity, or that the stream ends
/// within `capacity` bytes.
///
/// Only called when `capacity` cannot fit into the unused capacity of
/// `buffer`.
///
/// The default implementation moves buffered data to the start of
/// `buffer`, setting `seek` to zero, and cannot fail.
rebase: *const fn (r: *Reader, capacity: usize) RebaseError!void = defaultRebase,
};
pub const StreamError = error{
@ -97,6 +109,10 @@ pub const ShortError = error{
ReadFailed,
};
pub const RebaseError = error{
EndOfStream,
};
pub const failing: Reader = .{
.vtable = &.{
.stream = failingStream,
@ -122,6 +138,7 @@ pub fn fixed(buffer: []const u8) Reader {
.vtable = &.{
.stream = endingStream,
.discard = endingDiscard,
.rebase = endingRebase,
},
// This cast is safe because all potential writes to it will instead
// return `error.EndOfStream`.
@ -780,11 +797,8 @@ pub fn peekDelimiterInclusive(r: *Reader, delimiter: u8) DelimiterError![]u8 {
@branchHint(.likely);
return buffer[seek .. end + 1];
}
if (r.vtable.stream == &endingStream) {
// Protect the `@constCast` of `fixed`.
return error.EndOfStream;
}
r.rebase();
// TODO take a parameter for max search length rather than relying on buffer capacity
try rebase(r, r.buffer.len);
while (r.buffer.len - r.end != 0) {
const end_cap = r.buffer[r.end..];
var writer: Writer = .fixed(end_cap);
@ -1050,11 +1064,7 @@ fn fillUnbuffered(r: *Reader, n: usize) Error!void {
};
if (r.seek + n <= r.end) return;
};
if (r.vtable.stream == &endingStream) {
// Protect the `@constCast` of `fixed`.
return error.EndOfStream;
}
rebaseCapacity(r, n);
try rebase(r, n);
var writer: Writer = .{
.buffer = r.buffer,
.vtable = &.{ .drain = Writer.fixedDrain },
@ -1074,7 +1084,7 @@ fn fillUnbuffered(r: *Reader, n: usize) Error!void {
///
/// Asserts buffer capacity is at least 1.
pub fn fillMore(r: *Reader) Error!void {
rebaseCapacity(r, 1);
try rebase(r, 1);
var writer: Writer = .{
.buffer = r.buffer,
.end = r.end,
@ -1251,7 +1261,7 @@ pub fn takeLeb128(r: *Reader, comptime Result: type) TakeLeb128Error!Result {
pub fn expandTotalCapacity(r: *Reader, allocator: Allocator, n: usize) Allocator.Error!void {
if (n <= r.buffer.len) return;
if (r.seek > 0) rebase(r);
if (r.seek > 0) rebase(r, r.buffer.len);
var list: ArrayList(u8) = .{
.items = r.buffer[0..r.end],
.capacity = r.buffer.len,
@ -1297,37 +1307,20 @@ fn takeMultipleOf7Leb128(r: *Reader, comptime Result: type) TakeLeb128Error!Resu
}
}
/// Left-aligns data such that `r.seek` becomes zero.
///
/// If `r.seek` is not already zero then `buffer` is mutated, making it illegal
/// to call this function with a const-casted `buffer`, such as in the case of
/// `fixed`. This issue can be avoided:
/// * in implementations, by attempting a read before a rebase, in which
/// case the read will return `error.EndOfStream`, preventing the rebase.
/// * in usage, by copying into a mutable buffer before initializing `fixed`.
pub fn rebase(r: *Reader) void {
if (r.seek == 0) return;
/// Ensures `capacity` more data can be buffered without rebasing.
pub fn rebase(r: *Reader, capacity: usize) RebaseError!void {
if (r.end + capacity <= r.buffer.len) return;
return r.vtable.rebase(r, capacity);
}
pub fn defaultRebase(r: *Reader, capacity: usize) RebaseError!void {
if (r.end <= r.buffer.len - capacity) return;
const data = r.buffer[r.seek..r.end];
@memmove(r.buffer[0..data.len], data);
r.seek = 0;
r.end = data.len;
}
/// Ensures `capacity` more data can be buffered without rebasing, by rebasing
/// if necessary.
///
/// Asserts `capacity` is within the buffer capacity.
///
/// If the rebase occurs then `buffer` is mutated, making it illegal to call
/// this function with a const-casted `buffer`, such as in the case of `fixed`.
/// This issue can be avoided:
/// * in implementations, by attempting a read before a rebase, in which
/// case the read will return `error.EndOfStream`, preventing the rebase.
/// * in usage, by copying into a mutable buffer before initializing `fixed`.
pub fn rebaseCapacity(r: *Reader, capacity: usize) void {
if (r.end > r.buffer.len - capacity) rebase(r);
}
/// Advances the stream and decreases the size of the storage buffer by `n`,
/// returning the range of bytes no longer accessible by `r`.
///
@ -1683,6 +1676,12 @@ fn endingDiscard(r: *Reader, limit: Limit) Error!usize {
return error.EndOfStream;
}
fn endingRebase(r: *Reader, capacity: usize) RebaseError!void {
_ = r;
_ = capacity;
return error.EndOfStream;
}
fn failingStream(r: *Reader, w: *Writer, limit: Limit) StreamError!usize {
_ = r;
_ = w;

View file

@ -31,7 +31,12 @@ pub const Options = struct {
/// Verifying checksums is not implemented yet and will cause a panic if
/// you set this to true.
verify_checksum: bool = false,
/// Affects the minimum capacity of the provided buffer.
/// The output buffer is asserted to have capacity for `window_len` plus
/// `zstd.block_size_max`.
///
/// If `window_len` is too small, then some streams will fail to decompress
/// with `error.OutputBufferUndersize`.
window_len: u32 = zstd.default_window_len,
};
@ -69,8 +74,10 @@ pub const Error = error{
WindowSizeUnknown,
};
/// If buffer that is written to is not big enough, some streams will fail with
/// `error.OutputBufferUndersize`. A safe value is `zstd.default_window_len * 2`.
/// When connecting `reader` to a `Writer`, `buffer` should be empty, and
/// `Writer.buffer` capacity has requirements based on `Options.window_len`.
///
/// Otherwise, `buffer` has those requirements.
pub fn init(input: *Reader, buffer: []u8, options: Options) Decompress {
return .{
.input = input,
@ -78,7 +85,10 @@ pub fn init(input: *Reader, buffer: []u8, options: Options) Decompress {
.verify_checksum = options.verify_checksum,
.window_len = options.window_len,
.reader = .{
.vtable = &.{ .stream = stream },
.vtable = &.{
.stream = stream,
.rebase = rebase,
},
.buffer = buffer,
.seek = 0,
.end = 0,
@ -86,6 +96,18 @@ pub fn init(input: *Reader, buffer: []u8, options: Options) Decompress {
};
}
fn rebase(r: *Reader, capacity: usize) Reader.RebaseError!void {
const d: *Decompress = @alignCast(@fieldParentPtr("reader", r));
assert(capacity <= r.buffer.len - d.window_len);
assert(r.end + capacity > r.buffer.len);
const buffered = r.buffer[0..r.end];
const discard = buffered.len - d.window_len;
const keep = buffered[discard..];
@memmove(r.buffer[0..keep.len], keep);
r.end = keep.len;
r.seek -= discard;
}
fn stream(r: *Reader, w: *Writer, limit: Limit) Reader.StreamError!usize {
const d: *Decompress = @alignCast(@fieldParentPtr("reader", r));
const in = d.input;