From 04614d6ea17fff69ead42223c35a257da25462de Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sat, 26 Jul 2025 13:43:17 -0700 Subject: [PATCH] std.Io.Reader: add rebase to the vtable This eliminates a footgun and special case handling with fixed buffers, as well as allowing decompression streams to keep a window in the output buffer. --- lib/std/Io.zig | 2 +- lib/std/Io/Reader.zig | 73 ++++++++++++++-------------- lib/std/compress/zstd/Decompress.zig | 30 ++++++++++-- 3 files changed, 63 insertions(+), 42 deletions(-) diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 1ab5d13cab..1511f0dcad 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -757,7 +757,7 @@ pub fn Poller(comptime StreamEnum: type) type { const unused = r.buffer[r.end..]; if (unused.len >= min_len) return unused; } - if (r.seek > 0) r.rebase(); + if (r.seek > 0) r.rebase(r.buffer.len) catch unreachable; { var list: std.ArrayListUnmanaged(u8) = .{ .items = r.buffer[0..r.end], diff --git a/lib/std/Io/Reader.zig b/lib/std/Io/Reader.zig index fa05f0275b..da9e01dd2c 100644 --- a/lib/std/Io/Reader.zig +++ b/lib/std/Io/Reader.zig @@ -67,6 +67,18 @@ pub const VTable = struct { /// /// This function is only called when `buffer` is empty. discard: *const fn (r: *Reader, limit: Limit) Error!usize = defaultDiscard, + + /// Ensures `capacity` more data can be buffered without rebasing. + /// + /// Asserts `capacity` is within buffer capacity, or that the stream ends + /// within `capacity` bytes. + /// + /// Only called when `capacity` cannot fit into the unused capacity of + /// `buffer`. + /// + /// The default implementation moves buffered data to the start of + /// `buffer`, setting `seek` to zero, and cannot fail. + rebase: *const fn (r: *Reader, capacity: usize) RebaseError!void = defaultRebase, }; pub const StreamError = error{ @@ -97,6 +109,10 @@ pub const ShortError = error{ ReadFailed, }; +pub const RebaseError = error{ + EndOfStream, +}; + pub const failing: Reader = .{ .vtable = &.{ .stream = failingStream, @@ -122,6 +138,7 @@ pub fn fixed(buffer: []const u8) Reader { .vtable = &.{ .stream = endingStream, .discard = endingDiscard, + .rebase = endingRebase, }, // This cast is safe because all potential writes to it will instead // return `error.EndOfStream`. @@ -780,11 +797,8 @@ pub fn peekDelimiterInclusive(r: *Reader, delimiter: u8) DelimiterError![]u8 { @branchHint(.likely); return buffer[seek .. end + 1]; } - if (r.vtable.stream == &endingStream) { - // Protect the `@constCast` of `fixed`. - return error.EndOfStream; - } - r.rebase(); + // TODO take a parameter for max search length rather than relying on buffer capacity + try rebase(r, r.buffer.len); while (r.buffer.len - r.end != 0) { const end_cap = r.buffer[r.end..]; var writer: Writer = .fixed(end_cap); @@ -1050,11 +1064,7 @@ fn fillUnbuffered(r: *Reader, n: usize) Error!void { }; if (r.seek + n <= r.end) return; }; - if (r.vtable.stream == &endingStream) { - // Protect the `@constCast` of `fixed`. - return error.EndOfStream; - } - rebaseCapacity(r, n); + try rebase(r, n); var writer: Writer = .{ .buffer = r.buffer, .vtable = &.{ .drain = Writer.fixedDrain }, @@ -1074,7 +1084,7 @@ fn fillUnbuffered(r: *Reader, n: usize) Error!void { /// /// Asserts buffer capacity is at least 1. pub fn fillMore(r: *Reader) Error!void { - rebaseCapacity(r, 1); + try rebase(r, 1); var writer: Writer = .{ .buffer = r.buffer, .end = r.end, @@ -1251,7 +1261,7 @@ pub fn takeLeb128(r: *Reader, comptime Result: type) TakeLeb128Error!Result { pub fn expandTotalCapacity(r: *Reader, allocator: Allocator, n: usize) Allocator.Error!void { if (n <= r.buffer.len) return; - if (r.seek > 0) rebase(r); + if (r.seek > 0) rebase(r, r.buffer.len); var list: ArrayList(u8) = .{ .items = r.buffer[0..r.end], .capacity = r.buffer.len, @@ -1297,37 +1307,20 @@ fn takeMultipleOf7Leb128(r: *Reader, comptime Result: type) TakeLeb128Error!Resu } } -/// Left-aligns data such that `r.seek` becomes zero. -/// -/// If `r.seek` is not already zero then `buffer` is mutated, making it illegal -/// to call this function with a const-casted `buffer`, such as in the case of -/// `fixed`. This issue can be avoided: -/// * in implementations, by attempting a read before a rebase, in which -/// case the read will return `error.EndOfStream`, preventing the rebase. -/// * in usage, by copying into a mutable buffer before initializing `fixed`. -pub fn rebase(r: *Reader) void { - if (r.seek == 0) return; +/// Ensures `capacity` more data can be buffered without rebasing. +pub fn rebase(r: *Reader, capacity: usize) RebaseError!void { + if (r.end + capacity <= r.buffer.len) return; + return r.vtable.rebase(r, capacity); +} + +pub fn defaultRebase(r: *Reader, capacity: usize) RebaseError!void { + if (r.end <= r.buffer.len - capacity) return; const data = r.buffer[r.seek..r.end]; @memmove(r.buffer[0..data.len], data); r.seek = 0; r.end = data.len; } -/// Ensures `capacity` more data can be buffered without rebasing, by rebasing -/// if necessary. -/// -/// Asserts `capacity` is within the buffer capacity. -/// -/// If the rebase occurs then `buffer` is mutated, making it illegal to call -/// this function with a const-casted `buffer`, such as in the case of `fixed`. -/// This issue can be avoided: -/// * in implementations, by attempting a read before a rebase, in which -/// case the read will return `error.EndOfStream`, preventing the rebase. -/// * in usage, by copying into a mutable buffer before initializing `fixed`. -pub fn rebaseCapacity(r: *Reader, capacity: usize) void { - if (r.end > r.buffer.len - capacity) rebase(r); -} - /// Advances the stream and decreases the size of the storage buffer by `n`, /// returning the range of bytes no longer accessible by `r`. /// @@ -1683,6 +1676,12 @@ fn endingDiscard(r: *Reader, limit: Limit) Error!usize { return error.EndOfStream; } +fn endingRebase(r: *Reader, capacity: usize) RebaseError!void { + _ = r; + _ = capacity; + return error.EndOfStream; +} + fn failingStream(r: *Reader, w: *Writer, limit: Limit) StreamError!usize { _ = r; _ = w; diff --git a/lib/std/compress/zstd/Decompress.zig b/lib/std/compress/zstd/Decompress.zig index b831fe7fb4..b13a2dcf7a 100644 --- a/lib/std/compress/zstd/Decompress.zig +++ b/lib/std/compress/zstd/Decompress.zig @@ -31,7 +31,12 @@ pub const Options = struct { /// Verifying checksums is not implemented yet and will cause a panic if /// you set this to true. verify_checksum: bool = false, - /// Affects the minimum capacity of the provided buffer. + + /// The output buffer is asserted to have capacity for `window_len` plus + /// `zstd.block_size_max`. + /// + /// If `window_len` is too small, then some streams will fail to decompress + /// with `error.OutputBufferUndersize`. window_len: u32 = zstd.default_window_len, }; @@ -69,8 +74,10 @@ pub const Error = error{ WindowSizeUnknown, }; -/// If buffer that is written to is not big enough, some streams will fail with -/// `error.OutputBufferUndersize`. A safe value is `zstd.default_window_len * 2`. +/// When connecting `reader` to a `Writer`, `buffer` should be empty, and +/// `Writer.buffer` capacity has requirements based on `Options.window_len`. +/// +/// Otherwise, `buffer` has those requirements. pub fn init(input: *Reader, buffer: []u8, options: Options) Decompress { return .{ .input = input, @@ -78,7 +85,10 @@ pub fn init(input: *Reader, buffer: []u8, options: Options) Decompress { .verify_checksum = options.verify_checksum, .window_len = options.window_len, .reader = .{ - .vtable = &.{ .stream = stream }, + .vtable = &.{ + .stream = stream, + .rebase = rebase, + }, .buffer = buffer, .seek = 0, .end = 0, @@ -86,6 +96,18 @@ pub fn init(input: *Reader, buffer: []u8, options: Options) Decompress { }; } +fn rebase(r: *Reader, capacity: usize) Reader.RebaseError!void { + const d: *Decompress = @alignCast(@fieldParentPtr("reader", r)); + assert(capacity <= r.buffer.len - d.window_len); + assert(r.end + capacity > r.buffer.len); + const buffered = r.buffer[0..r.end]; + const discard = buffered.len - d.window_len; + const keep = buffered[discard..]; + @memmove(r.buffer[0..keep.len], keep); + r.end = keep.len; + r.seek -= discard; +} + fn stream(r: *Reader, w: *Writer, limit: Limit) Reader.StreamError!usize { const d: *Decompress = @alignCast(@fieldParentPtr("reader", r)); const in = d.input;