std.http: rework for new std.Io API

This commit is contained in:
Andrew Kelley 2025-07-31 22:36:08 -07:00
parent 8843631f7e
commit 02908a2d8c
7 changed files with 2449 additions and 2834 deletions

View file

@ -1,14 +1,14 @@
const builtin = @import("builtin");
const std = @import("std.zig");
const assert = std.debug.assert;
const Writer = std.Io.Writer;
const File = std.fs.File;
pub const Client = @import("http/Client.zig");
pub const Server = @import("http/Server.zig");
pub const protocol = @import("http/protocol.zig");
pub const HeadParser = @import("http/HeadParser.zig");
pub const ChunkParser = @import("http/ChunkParser.zig");
pub const HeaderIterator = @import("http/HeaderIterator.zig");
pub const WebSocket = @import("http/WebSocket.zig");
pub const Version = enum {
@"HTTP/1.0",
@ -42,7 +42,7 @@ pub const Method = enum(u64) {
return x;
}
pub fn format(self: Method, w: *std.io.Writer) std.io.Writer.Error!void {
pub fn format(self: Method, w: *Writer) Writer.Error!void {
const bytes: []const u8 = @ptrCast(&@intFromEnum(self));
const str = std.mem.sliceTo(bytes, 0);
try w.writeAll(str);
@ -296,13 +296,24 @@ pub const TransferEncoding = enum {
};
pub const ContentEncoding = enum {
identity,
compress,
@"x-compress",
deflate,
gzip,
@"x-gzip",
zstd,
gzip,
deflate,
compress,
identity,
pub fn fromString(s: []const u8) ?ContentEncoding {
const map = std.StaticStringMap(ContentEncoding).initComptime(.{
.{ "zstd", .zstd },
.{ "gzip", .gzip },
.{ "x-gzip", .gzip },
.{ "deflate", .deflate },
.{ "compress", .compress },
.{ "x-compress", .compress },
.{ "identity", .identity },
});
return map.get(s);
}
};
pub const Connection = enum {
@ -315,15 +326,755 @@ pub const Header = struct {
value: []const u8,
};
pub const Reader = struct {
in: *std.Io.Reader,
/// This is preallocated memory that might be used by `bodyReader`. That
/// function might return a pointer to this field, or a different
/// `*std.Io.Reader`. Advisable to not access this field directly.
interface: std.Io.Reader,
/// Keeps track of whether the stream is ready to accept a new request,
/// making invalid API usage cause assertion failures rather than HTTP
/// protocol violations.
state: State,
/// HTTP trailer bytes. These are at the end of a transfer-encoding:
/// chunked message. This data is available only after calling one of the
/// "end" functions and points to data inside the buffer of `in`, and is
/// therefore invalidated on the next call to `receiveHead`, or any other
/// read from `in`.
trailers: []const u8 = &.{},
body_err: ?BodyError = null,
/// Stolen from `in`.
head_buffer: []u8 = &.{},
pub const max_chunk_header_len = 22;
pub const RemainingChunkLen = enum(u64) {
head = 0,
n = 1,
rn = 2,
_,
pub fn init(integer: u64) RemainingChunkLen {
return @enumFromInt(integer);
}
pub fn int(rcl: RemainingChunkLen) u64 {
return @intFromEnum(rcl);
}
};
pub const State = union(enum) {
/// The stream is available to be used for the first time, or reused.
ready,
received_head,
/// The stream goes until the connection is closed.
body_none,
body_remaining_content_length: u64,
body_remaining_chunk_len: RemainingChunkLen,
/// The stream would be eligible for another HTTP request, however the
/// client and server did not negotiate a persistent connection.
closing,
};
pub const BodyError = error{
HttpChunkInvalid,
HttpChunkTruncated,
HttpHeadersOversize,
};
pub const HeadError = error{
/// Too many bytes of HTTP headers.
///
/// The HTTP specification suggests to respond with a 431 status code
/// before closing the connection.
HttpHeadersOversize,
/// Partial HTTP request was received but the connection was closed
/// before fully receiving the headers.
HttpRequestTruncated,
/// The client sent 0 bytes of headers before closing the stream. This
/// happens when a keep-alive connection is finally closed.
HttpConnectionClosing,
/// Transitive error occurred reading from `in`.
ReadFailed,
};
pub fn restituteHeadBuffer(reader: *Reader) void {
reader.in.restitute(reader.head_buffer.len);
reader.head_buffer.len = 0;
}
/// Buffers the entire head into `head_buffer`, invalidating the previous
/// `head_buffer`, if any.
pub fn receiveHead(reader: *Reader) HeadError!void {
reader.trailers = &.{};
const in = reader.in;
in.restitute(reader.head_buffer.len);
reader.head_buffer.len = 0;
in.rebase();
var hp: HeadParser = .{};
var head_end: usize = 0;
while (true) {
if (head_end >= in.buffer.len) return error.HttpHeadersOversize;
in.fillMore() catch |err| switch (err) {
error.EndOfStream => switch (head_end) {
0 => return error.HttpConnectionClosing,
else => return error.HttpRequestTruncated,
},
error.ReadFailed => return error.ReadFailed,
};
head_end += hp.feed(in.buffered()[head_end..]);
if (hp.state == .finished) {
reader.head_buffer = in.steal(head_end);
reader.state = .received_head;
return;
}
}
}
/// If compressed body has been negotiated this will return compressed bytes.
///
/// Asserts only called once and after `receiveHead`.
///
/// See also:
/// * `interfaceDecompressing`
pub fn bodyReader(
reader: *Reader,
buffer: []u8,
transfer_encoding: TransferEncoding,
content_length: ?u64,
) *std.Io.Reader {
assert(reader.state == .received_head);
switch (transfer_encoding) {
.chunked => {
reader.state = .{ .body_remaining_chunk_len = .head };
reader.interface = .{
.buffer = buffer,
.seek = 0,
.end = 0,
.vtable = &.{
.stream = chunkedStream,
.discard = chunkedDiscard,
},
};
return &reader.interface;
},
.none => {
if (content_length) |len| {
reader.state = .{ .body_remaining_content_length = len };
reader.interface = .{
.buffer = buffer,
.seek = 0,
.end = 0,
.vtable = &.{
.stream = contentLengthStream,
.discard = contentLengthDiscard,
},
};
return &reader.interface;
} else {
reader.state = .body_none;
return reader.in;
}
},
}
}
/// If compressed body has been negotiated this will return decompressed bytes.
///
/// Asserts only called once and after `receiveHead`.
///
/// See also:
/// * `interface`
pub fn bodyReaderDecompressing(
reader: *Reader,
transfer_encoding: TransferEncoding,
content_length: ?u64,
content_encoding: ContentEncoding,
decompressor: *Decompressor,
decompression_buffer: []u8,
) *std.Io.Reader {
if (transfer_encoding == .none and content_length == null) {
assert(reader.state == .received_head);
reader.state = .body_none;
switch (content_encoding) {
.identity => {
return reader.in;
},
.deflate => {
decompressor.* = .{ .flate = .init(reader.in, .raw, decompression_buffer) };
return &decompressor.flate.reader;
},
.gzip => {
decompressor.* = .{ .flate = .init(reader.in, .gzip, decompression_buffer) };
return &decompressor.flate.reader;
},
.zstd => {
decompressor.* = .{ .zstd = .init(reader.in, decompression_buffer, .{ .verify_checksum = false }) };
return &decompressor.zstd.reader;
},
.compress => unreachable,
}
}
const transfer_reader = bodyReader(reader, &.{}, transfer_encoding, content_length);
return decompressor.init(transfer_reader, decompression_buffer, content_encoding);
}
fn contentLengthStream(
io_r: *std.Io.Reader,
w: *Writer,
limit: std.Io.Limit,
) std.Io.Reader.StreamError!usize {
const reader: *Reader = @fieldParentPtr("interface", io_r);
const remaining_content_length = &reader.state.body_remaining_content_length;
const remaining = remaining_content_length.*;
if (remaining == 0) {
reader.state = .ready;
return error.EndOfStream;
}
const n = try reader.in.stream(w, limit.min(.limited(remaining)));
remaining_content_length.* = remaining - n;
return n;
}
fn contentLengthDiscard(io_r: *std.Io.Reader, limit: std.Io.Limit) std.Io.Reader.Error!usize {
const reader: *Reader = @fieldParentPtr("interface", io_r);
const remaining_content_length = &reader.state.body_remaining_content_length;
const remaining = remaining_content_length.*;
if (remaining == 0) {
reader.state = .ready;
return error.EndOfStream;
}
const n = try reader.in.discard(limit.min(.limited(remaining)));
remaining_content_length.* = remaining - n;
return n;
}
fn chunkedStream(io_r: *std.Io.Reader, w: *Writer, limit: std.Io.Limit) std.Io.Reader.StreamError!usize {
const reader: *Reader = @fieldParentPtr("interface", io_r);
const chunk_len_ptr = switch (reader.state) {
.ready => return error.EndOfStream,
.body_remaining_chunk_len => |*x| x,
else => unreachable,
};
return chunkedReadEndless(reader, w, limit, chunk_len_ptr) catch |err| switch (err) {
error.ReadFailed => return error.ReadFailed,
error.WriteFailed => return error.WriteFailed,
error.EndOfStream => {
reader.body_err = error.HttpChunkTruncated;
return error.ReadFailed;
},
else => |e| {
reader.body_err = e;
return error.ReadFailed;
},
};
}
fn chunkedReadEndless(
reader: *Reader,
w: *Writer,
limit: std.Io.Limit,
chunk_len_ptr: *RemainingChunkLen,
) (BodyError || std.Io.Reader.StreamError)!usize {
const in = reader.in;
len: switch (chunk_len_ptr.*) {
.head => {
var cp: ChunkParser = .init;
while (true) {
const i = cp.feed(in.buffered());
switch (cp.state) {
.invalid => return error.HttpChunkInvalid,
.data => {
in.toss(i);
break;
},
else => {
in.toss(i);
try in.fillMore();
continue;
},
}
}
if (cp.chunk_len == 0) return parseTrailers(reader, 0);
const n = try in.stream(w, limit.min(.limited(cp.chunk_len)));
chunk_len_ptr.* = .init(cp.chunk_len + 2 - n);
return n;
},
.n => {
if ((try in.peekByte()) != '\n') return error.HttpChunkInvalid;
in.toss(1);
continue :len .head;
},
.rn => {
const rn = try in.peekArray(2);
if (rn[0] != '\r' or rn[1] != '\n') return error.HttpChunkInvalid;
in.toss(2);
continue :len .head;
},
else => |remaining_chunk_len| {
const n = try in.stream(w, limit.min(.limited(@intFromEnum(remaining_chunk_len) - 2)));
chunk_len_ptr.* = .init(@intFromEnum(remaining_chunk_len) - n);
return n;
},
}
}
fn chunkedDiscard(io_r: *std.Io.Reader, limit: std.Io.Limit) std.Io.Reader.Error!usize {
const reader: *Reader = @fieldParentPtr("interface", io_r);
const chunk_len_ptr = switch (reader.state) {
.ready => return error.EndOfStream,
.body_remaining_chunk_len => |*x| x,
else => unreachable,
};
return chunkedDiscardEndless(reader, limit, chunk_len_ptr) catch |err| switch (err) {
error.ReadFailed => return error.ReadFailed,
error.EndOfStream => {
reader.body_err = error.HttpChunkTruncated;
return error.ReadFailed;
},
else => |e| {
reader.body_err = e;
return error.ReadFailed;
},
};
}
fn chunkedDiscardEndless(
reader: *Reader,
limit: std.Io.Limit,
chunk_len_ptr: *RemainingChunkLen,
) (BodyError || std.Io.Reader.Error)!usize {
const in = reader.in;
len: switch (chunk_len_ptr.*) {
.head => {
var cp: ChunkParser = .init;
while (true) {
const i = cp.feed(in.buffered());
switch (cp.state) {
.invalid => return error.HttpChunkInvalid,
.data => {
in.toss(i);
break;
},
else => {
in.toss(i);
try in.fillMore();
continue;
},
}
}
if (cp.chunk_len == 0) return parseTrailers(reader, 0);
const n = try in.discard(limit.min(.limited(cp.chunk_len)));
chunk_len_ptr.* = .init(cp.chunk_len + 2 - n);
return n;
},
.n => {
if ((try in.peekByte()) != '\n') return error.HttpChunkInvalid;
in.toss(1);
continue :len .head;
},
.rn => {
const rn = try in.peekArray(2);
if (rn[0] != '\r' or rn[1] != '\n') return error.HttpChunkInvalid;
in.toss(2);
continue :len .head;
},
else => |remaining_chunk_len| {
const n = try in.discard(limit.min(.limited(remaining_chunk_len.int() - 2)));
chunk_len_ptr.* = .init(remaining_chunk_len.int() - n);
return n;
},
}
}
/// Called when next bytes in the stream are trailers, or "\r\n" to indicate
/// end of chunked body.
fn parseTrailers(reader: *Reader, amt_read: usize) (BodyError || std.Io.Reader.Error)!usize {
const in = reader.in;
const rn = try in.peekArray(2);
if (rn[0] == '\r' and rn[1] == '\n') {
in.toss(2);
reader.state = .ready;
assert(reader.trailers.len == 0);
return amt_read;
}
var hp: HeadParser = .{ .state = .seen_rn };
var trailers_len: usize = 2;
while (true) {
if (in.buffer.len - trailers_len == 0) return error.HttpHeadersOversize;
const remaining = in.buffered()[trailers_len..];
if (remaining.len == 0) {
try in.fillMore();
continue;
}
trailers_len += hp.feed(remaining);
if (hp.state == .finished) {
reader.state = .ready;
reader.trailers = in.buffered()[0..trailers_len];
in.toss(trailers_len);
return amt_read;
}
}
}
};
pub const Decompressor = union(enum) {
flate: std.compress.flate.Decompress,
zstd: std.compress.zstd.Decompress,
none: *std.Io.Reader,
pub fn init(
decompressor: *Decompressor,
transfer_reader: *std.Io.Reader,
buffer: []u8,
content_encoding: ContentEncoding,
) *std.Io.Reader {
switch (content_encoding) {
.identity => {
decompressor.* = .{ .none = transfer_reader };
return transfer_reader;
},
.deflate => {
decompressor.* = .{ .flate = .init(transfer_reader, .raw, buffer) };
return &decompressor.flate.reader;
},
.gzip => {
decompressor.* = .{ .flate = .init(transfer_reader, .gzip, buffer) };
return &decompressor.flate.reader;
},
.zstd => {
decompressor.* = .{ .zstd = .init(transfer_reader, buffer, .{ .verify_checksum = false }) };
return &decompressor.zstd.reader;
},
.compress => unreachable,
}
}
};
/// Request or response body.
pub const BodyWriter = struct {
/// Until the lifetime of `BodyWriter` ends, it is illegal to modify the
/// state of this other than via methods of `BodyWriter`.
http_protocol_output: *Writer,
state: State,
writer: Writer,
pub const Error = Writer.Error;
/// How many zeroes to reserve for hex-encoded chunk length.
const chunk_len_digits = 8;
const max_chunk_len: usize = std.math.pow(usize, 16, chunk_len_digits) - 1;
const chunk_header_template = ("0" ** chunk_len_digits) ++ "\r\n";
comptime {
assert(max_chunk_len == std.math.maxInt(u32));
}
pub const State = union(enum) {
/// End of connection signals the end of the stream.
none,
/// As a debugging utility, counts down to zero as bytes are written.
content_length: u64,
/// Each chunk is wrapped in a header and trailer.
chunked: Chunked,
/// Cleanly finished stream; connection can be reused.
end,
pub const Chunked = union(enum) {
/// Index to the start of the hex-encoded chunk length in the chunk
/// header within the buffer of `BodyWriter.http_protocol_output`.
/// Buffered chunk data starts here plus length of `chunk_header_template`.
offset: usize,
/// We are in the middle of a chunk and this is how many bytes are
/// left until the next header. This includes +2 for "\r"\n", and
/// is zero for the beginning of the stream.
chunk_len: usize,
pub const init: Chunked = .{ .chunk_len = 0 };
};
};
pub fn isEliding(w: *const BodyWriter) bool {
return w.writer.vtable.drain == Writer.discardingDrain;
}
/// Sends all buffered data across `BodyWriter.http_protocol_output`.
pub fn flush(w: *BodyWriter) Error!void {
const out = w.http_protocol_output;
switch (w.state) {
.end, .none, .content_length => return out.flush(),
.chunked => |*chunked| switch (chunked.*) {
.offset => |offset| {
const chunk_len = out.end - offset - chunk_header_template.len;
if (chunk_len > 0) {
writeHex(out.buffer[offset..][0..chunk_len_digits], chunk_len);
chunked.* = .{ .chunk_len = 2 };
} else {
out.end = offset;
chunked.* = .{ .chunk_len = 0 };
}
try out.flush();
},
.chunk_len => return out.flush(),
},
}
}
/// When using content-length, asserts that the amount of data sent matches
/// the value sent in the header, then flushes.
///
/// When using transfer-encoding: chunked, writes the end-of-stream message
/// with empty trailers, then flushes the stream to the system. Asserts any
/// started chunk has been completely finished.
///
/// Respects the value of `isEliding` to omit all data after the headers.
///
/// See also:
/// * `endUnflushed`
/// * `endChunked`
pub fn end(w: *BodyWriter) Error!void {
try endUnflushed(w);
try w.http_protocol_output.flush();
}
/// When using content-length, asserts that the amount of data sent matches
/// the value sent in the header.
///
/// Otherwise, transfer-encoding: chunked is being used, and it writes the
/// end-of-stream message with empty trailers.
///
/// Respects the value of `isEliding` to omit all data after the headers.
///
/// See also:
/// * `end`
/// * `endChunked`
pub fn endUnflushed(w: *BodyWriter) Error!void {
switch (w.state) {
.end => unreachable,
.content_length => |len| {
assert(len == 0); // Trips when end() called before all bytes written.
w.state = .end;
},
.none => {},
.chunked => return endChunkedUnflushed(w, .{}),
}
}
pub const EndChunkedOptions = struct {
trailers: []const Header = &.{},
};
/// Writes the end-of-stream message and any optional trailers, flushing
/// the underlying stream.
///
/// Asserts that the BodyWriter is using transfer-encoding: chunked.
///
/// Respects the value of `isEliding` to omit all data after the headers.
///
/// See also:
/// * `endChunkedUnflushed`
/// * `end`
pub fn endChunked(w: *BodyWriter, options: EndChunkedOptions) Error!void {
try endChunkedUnflushed(w, options);
try w.http_protocol_output.flush();
}
/// Writes the end-of-stream message and any optional trailers.
///
/// Does not flush.
///
/// Asserts that the BodyWriter is using transfer-encoding: chunked.
///
/// Respects the value of `isEliding` to omit all data after the headers.
///
/// See also:
/// * `endChunked`
/// * `endUnflushed`
/// * `end`
pub fn endChunkedUnflushed(w: *BodyWriter, options: EndChunkedOptions) Error!void {
const chunked = &w.state.chunked;
if (w.isEliding()) {
w.state = .end;
return;
}
const bw = w.http_protocol_output;
switch (chunked.*) {
.offset => |offset| {
const chunk_len = bw.end - offset - chunk_header_template.len;
writeHex(bw.buffer[offset..][0..chunk_len_digits], chunk_len);
try bw.writeAll("\r\n");
},
.chunk_len => |chunk_len| switch (chunk_len) {
0 => {},
1 => try bw.writeByte('\n'),
2 => try bw.writeAll("\r\n"),
else => unreachable, // An earlier write call indicated more data would follow.
},
}
try bw.writeAll("0\r\n");
for (options.trailers) |trailer| {
try bw.writeAll(trailer.name);
try bw.writeAll(": ");
try bw.writeAll(trailer.value);
try bw.writeAll("\r\n");
}
try bw.writeAll("\r\n");
w.state = .end;
}
pub fn contentLengthDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
const out = bw.http_protocol_output;
const n = try out.writeSplatHeader(w.buffered(), data, splat);
bw.state.content_length -= n;
return w.consume(n);
}
pub fn noneDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
const out = bw.http_protocol_output;
const n = try out.writeSplatHeader(w.buffered(), data, splat);
return w.consume(n);
}
/// Returns `null` if size cannot be computed without making any syscalls.
pub fn noneSendFile(w: *Writer, file_reader: *File.Reader, limit: std.Io.Limit) Writer.FileError!usize {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
const out = bw.http_protocol_output;
const n = try out.sendFileHeader(w.buffered(), file_reader, limit);
return w.consume(n);
}
pub fn contentLengthSendFile(w: *Writer, file_reader: *File.Reader, limit: std.Io.Limit) Writer.FileError!usize {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
const out = bw.http_protocol_output;
const n = try out.sendFileHeader(w.buffered(), file_reader, limit);
bw.state.content_length -= n;
return w.consume(n);
}
pub fn chunkedSendFile(w: *Writer, file_reader: *File.Reader, limit: std.Io.Limit) Writer.FileError!usize {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
const data_len = Writer.countSendFileLowerBound(w.end, file_reader, limit) orelse {
// If the file size is unknown, we cannot lower to a `sendFile` since we would
// have to flush the chunk header before knowing the chunk length.
return error.Unimplemented;
};
const out = bw.http_protocol_output;
const chunked = &bw.state.chunked;
state: switch (chunked.*) {
.offset => |off| {
// TODO: is it better perf to read small files into the buffer?
const buffered_len = out.end - off - chunk_header_template.len;
const chunk_len = data_len + buffered_len;
writeHex(out.buffer[off..][0..chunk_len_digits], chunk_len);
const n = try out.sendFileHeader(w.buffered(), file_reader, limit);
chunked.* = .{ .chunk_len = data_len + 2 - n };
return w.consume(n);
},
.chunk_len => |chunk_len| l: switch (chunk_len) {
0 => {
const off = out.end;
const header_buf = try out.writableArray(chunk_header_template.len);
@memcpy(header_buf, chunk_header_template);
chunked.* = .{ .offset = off };
continue :state .{ .offset = off };
},
1 => {
try out.writeByte('\n');
chunked.chunk_len = 0;
continue :l 0;
},
2 => {
try out.writeByte('\r');
chunked.chunk_len = 1;
continue :l 1;
},
else => {
const new_limit = limit.min(.limited(chunk_len - 2));
const n = try out.sendFileHeader(w.buffered(), file_reader, new_limit);
chunked.chunk_len = chunk_len - n;
return w.consume(n);
},
},
}
}
pub fn chunkedDrain(w: *Writer, data: []const []const u8, splat: usize) Error!usize {
const bw: *BodyWriter = @fieldParentPtr("writer", w);
assert(!bw.isEliding());
const out = bw.http_protocol_output;
const data_len = w.end + Writer.countSplat(data, splat);
const chunked = &bw.state.chunked;
state: switch (chunked.*) {
.offset => |offset| {
if (out.unusedCapacityLen() >= data_len) {
return w.consume(out.writeSplatHeader(w.buffered(), data, splat) catch unreachable);
}
const buffered_len = out.end - offset - chunk_header_template.len;
const chunk_len = data_len + buffered_len;
writeHex(out.buffer[offset..][0..chunk_len_digits], chunk_len);
const n = try out.writeSplatHeader(w.buffered(), data, splat);
chunked.* = .{ .chunk_len = data_len + 2 - n };
return w.consume(n);
},
.chunk_len => |chunk_len| l: switch (chunk_len) {
0 => {
const offset = out.end;
const header_buf = try out.writableArray(chunk_header_template.len);
@memcpy(header_buf, chunk_header_template);
chunked.* = .{ .offset = offset };
continue :state .{ .offset = offset };
},
1 => {
try out.writeByte('\n');
chunked.chunk_len = 0;
continue :l 0;
},
2 => {
try out.writeByte('\r');
chunked.chunk_len = 1;
continue :l 1;
},
else => {
const n = try out.writeSplatHeaderLimit(w.buffered(), data, splat, .limited(chunk_len - 2));
chunked.chunk_len = chunk_len - n;
return w.consume(n);
},
},
}
}
/// Writes an integer as base 16 to `buf`, right-aligned, assuming the
/// buffer has already been filled with zeroes.
fn writeHex(buf: []u8, x: usize) void {
assert(std.mem.allEqual(u8, buf, '0'));
const base = 16;
var index: usize = buf.len;
var a = x;
while (a > 0) {
const digit = a % base;
index -= 1;
buf[index] = std.fmt.digitToChar(@intCast(digit), .lower);
a /= base;
}
}
};
test {
_ = Server;
_ = Status;
_ = Method;
_ = ChunkParser;
_ = HeadParser;
if (builtin.os.tag != .wasi) {
_ = Client;
_ = Method;
_ = Server;
_ = Status;
_ = HeadParser;
_ = ChunkParser;
_ = WebSocket;
_ = @import("http/test.zig");
}
}

View file

@ -1,5 +1,8 @@
//! Parser for transfer-encoding: chunked.
const ChunkParser = @This();
const std = @import("std");
state: State,
chunk_len: u64,
@ -97,9 +100,6 @@ pub fn feed(p: *ChunkParser, bytes: []const u8) usize {
return bytes.len;
}
const ChunkParser = @This();
const std = @import("std");
test feed {
const testing = std.testing;

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -1,246 +0,0 @@
//! See https://tools.ietf.org/html/rfc6455
const builtin = @import("builtin");
const std = @import("std");
const WebSocket = @This();
const assert = std.debug.assert;
const native_endian = builtin.cpu.arch.endian();
key: []const u8,
request: *std.http.Server.Request,
recv_fifo: std.fifo.LinearFifo(u8, .Slice),
reader: std.io.AnyReader,
response: std.http.Server.Response,
/// Number of bytes that have been peeked but not discarded yet.
outstanding_len: usize,
pub const InitError = error{WebSocketUpgradeMissingKey} ||
std.http.Server.Request.ReaderError;
pub fn init(
request: *std.http.Server.Request,
send_buffer: []u8,
recv_buffer: []align(4) u8,
) InitError!?WebSocket {
switch (request.head.version) {
.@"HTTP/1.0" => return null,
.@"HTTP/1.1" => if (request.head.method != .GET) return null,
}
var sec_websocket_key: ?[]const u8 = null;
var upgrade_websocket: bool = false;
var it = request.iterateHeaders();
while (it.next()) |header| {
if (std.ascii.eqlIgnoreCase(header.name, "sec-websocket-key")) {
sec_websocket_key = header.value;
} else if (std.ascii.eqlIgnoreCase(header.name, "upgrade")) {
if (!std.ascii.eqlIgnoreCase(header.value, "websocket"))
return null;
upgrade_websocket = true;
}
}
if (!upgrade_websocket)
return null;
const key = sec_websocket_key orelse return error.WebSocketUpgradeMissingKey;
var sha1 = std.crypto.hash.Sha1.init(.{});
sha1.update(key);
sha1.update("258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
var digest: [std.crypto.hash.Sha1.digest_length]u8 = undefined;
sha1.final(&digest);
var base64_digest: [28]u8 = undefined;
assert(std.base64.standard.Encoder.encode(&base64_digest, &digest).len == base64_digest.len);
request.head.content_length = std.math.maxInt(u64);
return .{
.key = key,
.recv_fifo = std.fifo.LinearFifo(u8, .Slice).init(recv_buffer),
.reader = try request.reader(),
.response = request.respondStreaming(.{
.send_buffer = send_buffer,
.respond_options = .{
.status = .switching_protocols,
.extra_headers = &.{
.{ .name = "upgrade", .value = "websocket" },
.{ .name = "connection", .value = "upgrade" },
.{ .name = "sec-websocket-accept", .value = &base64_digest },
},
.transfer_encoding = .none,
},
}),
.request = request,
.outstanding_len = 0,
};
}
pub const Header0 = packed struct(u8) {
opcode: Opcode,
rsv3: u1 = 0,
rsv2: u1 = 0,
rsv1: u1 = 0,
fin: bool,
};
pub const Header1 = packed struct(u8) {
payload_len: enum(u7) {
len16 = 126,
len64 = 127,
_,
},
mask: bool,
};
pub const Opcode = enum(u4) {
continuation = 0,
text = 1,
binary = 2,
connection_close = 8,
ping = 9,
/// "A Pong frame MAY be sent unsolicited. This serves as a unidirectional
/// heartbeat. A response to an unsolicited Pong frame is not expected."
pong = 10,
_,
};
pub const ReadSmallTextMessageError = error{
ConnectionClose,
UnexpectedOpCode,
MessageTooBig,
MissingMaskBit,
} || RecvError;
pub const SmallMessage = struct {
/// Can be text, binary, or ping.
opcode: Opcode,
data: []u8,
};
/// Reads the next message from the WebSocket stream, failing if the message does not fit
/// into `recv_buffer`.
pub fn readSmallMessage(ws: *WebSocket) ReadSmallTextMessageError!SmallMessage {
while (true) {
const header_bytes = (try recv(ws, 2))[0..2];
const h0: Header0 = @bitCast(header_bytes[0]);
const h1: Header1 = @bitCast(header_bytes[1]);
switch (h0.opcode) {
.text, .binary, .pong, .ping => {},
.connection_close => return error.ConnectionClose,
.continuation => return error.UnexpectedOpCode,
_ => return error.UnexpectedOpCode,
}
if (!h0.fin) return error.MessageTooBig;
if (!h1.mask) return error.MissingMaskBit;
const len: usize = switch (h1.payload_len) {
.len16 => try recvReadInt(ws, u16),
.len64 => std.math.cast(usize, try recvReadInt(ws, u64)) orelse return error.MessageTooBig,
else => @intFromEnum(h1.payload_len),
};
if (len > ws.recv_fifo.buf.len) return error.MessageTooBig;
const mask: u32 = @bitCast((try recv(ws, 4))[0..4].*);
const payload = try recv(ws, len);
// Skip pongs.
if (h0.opcode == .pong) continue;
// The last item may contain a partial word of unused data.
const floored_len = (payload.len / 4) * 4;
const u32_payload: []align(1) u32 = @alignCast(std.mem.bytesAsSlice(u32, payload[0..floored_len]));
for (u32_payload) |*elem| elem.* ^= mask;
const mask_bytes = std.mem.asBytes(&mask)[0 .. payload.len - floored_len];
for (payload[floored_len..], mask_bytes) |*leftover, m| leftover.* ^= m;
return .{
.opcode = h0.opcode,
.data = payload,
};
}
}
const RecvError = std.http.Server.Request.ReadError || error{EndOfStream};
fn recv(ws: *WebSocket, len: usize) RecvError![]u8 {
ws.recv_fifo.discard(ws.outstanding_len);
assert(len <= ws.recv_fifo.buf.len);
if (len > ws.recv_fifo.count) {
const small_buf = ws.recv_fifo.writableSlice(0);
const needed = len - ws.recv_fifo.count;
const buf = if (small_buf.len >= needed) small_buf else b: {
ws.recv_fifo.realign();
break :b ws.recv_fifo.writableSlice(0);
};
const n = try @as(RecvError!usize, @errorCast(ws.reader.readAtLeast(buf, needed)));
if (n < needed) return error.EndOfStream;
ws.recv_fifo.update(n);
}
ws.outstanding_len = len;
// TODO: improve the std lib API so this cast isn't necessary.
return @constCast(ws.recv_fifo.readableSliceOfLen(len));
}
fn recvReadInt(ws: *WebSocket, comptime I: type) !I {
const unswapped: I = @bitCast((try recv(ws, @sizeOf(I)))[0..@sizeOf(I)].*);
return switch (native_endian) {
.little => @byteSwap(unswapped),
.big => unswapped,
};
}
pub const WriteError = std.http.Server.Response.WriteError;
pub fn writeMessage(ws: *WebSocket, message: []const u8, opcode: Opcode) WriteError!void {
const iovecs: [1]std.posix.iovec_const = .{
.{ .base = message.ptr, .len = message.len },
};
return writeMessagev(ws, &iovecs, opcode);
}
pub fn writeMessagev(ws: *WebSocket, message: []const std.posix.iovec_const, opcode: Opcode) WriteError!void {
const total_len = l: {
var total_len: u64 = 0;
for (message) |iovec| total_len += iovec.len;
break :l total_len;
};
var header_buf: [2 + 8]u8 = undefined;
header_buf[0] = @bitCast(@as(Header0, .{
.opcode = opcode,
.fin = true,
}));
const header = switch (total_len) {
0...125 => blk: {
header_buf[1] = @bitCast(@as(Header1, .{
.payload_len = @enumFromInt(total_len),
.mask = false,
}));
break :blk header_buf[0..2];
},
126...0xffff => blk: {
header_buf[1] = @bitCast(@as(Header1, .{
.payload_len = .len16,
.mask = false,
}));
std.mem.writeInt(u16, header_buf[2..4], @intCast(total_len), .big);
break :blk header_buf[0..4];
},
else => blk: {
header_buf[1] = @bitCast(@as(Header1, .{
.payload_len = .len64,
.mask = false,
}));
std.mem.writeInt(u64, header_buf[2..10], total_len, .big);
break :blk header_buf[0..10];
},
};
const response = &ws.response;
try response.writeAll(header);
for (message) |iovec|
try response.writeAll(iovec.base[0..iovec.len]);
try response.flush();
}

View file

@ -1,464 +0,0 @@
const std = @import("../std.zig");
const builtin = @import("builtin");
const testing = std.testing;
const mem = std.mem;
const assert = std.debug.assert;
pub const State = enum {
invalid,
// Begin header and trailer parsing states.
start,
seen_n,
seen_r,
seen_rn,
seen_rnr,
finished,
// Begin transfer-encoding: chunked parsing states.
chunk_head_size,
chunk_head_ext,
chunk_head_r,
chunk_data,
chunk_data_suffix,
chunk_data_suffix_r,
/// Returns true if the parser is in a content state (ie. not waiting for more headers).
pub fn isContent(self: State) bool {
return switch (self) {
.invalid, .start, .seen_n, .seen_r, .seen_rn, .seen_rnr => false,
.finished, .chunk_head_size, .chunk_head_ext, .chunk_head_r, .chunk_data, .chunk_data_suffix, .chunk_data_suffix_r => true,
};
}
};
pub const HeadersParser = struct {
state: State = .start,
/// A fixed buffer of len `max_header_bytes`.
/// Pointers into this buffer are not stable until after a message is complete.
header_bytes_buffer: []u8,
header_bytes_len: u32,
next_chunk_length: u64,
/// `false`: headers. `true`: trailers.
done: bool,
/// Initializes the parser with a provided buffer `buf`.
pub fn init(buf: []u8) HeadersParser {
return .{
.header_bytes_buffer = buf,
.header_bytes_len = 0,
.done = false,
.next_chunk_length = 0,
};
}
/// Reinitialize the parser.
/// Asserts the parser is in the "done" state.
pub fn reset(hp: *HeadersParser) void {
assert(hp.done);
hp.* = .{
.state = .start,
.header_bytes_buffer = hp.header_bytes_buffer,
.header_bytes_len = 0,
.done = false,
.next_chunk_length = 0,
};
}
pub fn get(hp: HeadersParser) []u8 {
return hp.header_bytes_buffer[0..hp.header_bytes_len];
}
pub fn findHeadersEnd(r: *HeadersParser, bytes: []const u8) u32 {
var hp: std.http.HeadParser = .{
.state = switch (r.state) {
.start => .start,
.seen_n => .seen_n,
.seen_r => .seen_r,
.seen_rn => .seen_rn,
.seen_rnr => .seen_rnr,
.finished => .finished,
else => unreachable,
},
};
const result = hp.feed(bytes);
r.state = switch (hp.state) {
.start => .start,
.seen_n => .seen_n,
.seen_r => .seen_r,
.seen_rn => .seen_rn,
.seen_rnr => .seen_rnr,
.finished => .finished,
};
return @intCast(result);
}
pub fn findChunkedLen(r: *HeadersParser, bytes: []const u8) u32 {
var cp: std.http.ChunkParser = .{
.state = switch (r.state) {
.chunk_head_size => .head_size,
.chunk_head_ext => .head_ext,
.chunk_head_r => .head_r,
.chunk_data => .data,
.chunk_data_suffix => .data_suffix,
.chunk_data_suffix_r => .data_suffix_r,
.invalid => .invalid,
else => unreachable,
},
.chunk_len = r.next_chunk_length,
};
const result = cp.feed(bytes);
r.state = switch (cp.state) {
.head_size => .chunk_head_size,
.head_ext => .chunk_head_ext,
.head_r => .chunk_head_r,
.data => .chunk_data,
.data_suffix => .chunk_data_suffix,
.data_suffix_r => .chunk_data_suffix_r,
.invalid => .invalid,
};
r.next_chunk_length = cp.chunk_len;
return @intCast(result);
}
/// Returns whether or not the parser has finished parsing a complete
/// message. A message is only complete after the entire body has been read
/// and any trailing headers have been parsed.
pub fn isComplete(r: *HeadersParser) bool {
return r.done and r.state == .finished;
}
pub const CheckCompleteHeadError = error{HttpHeadersOversize};
/// Pushes `in` into the parser. Returns the number of bytes consumed by
/// the header. Any header bytes are appended to `header_bytes_buffer`.
pub fn checkCompleteHead(hp: *HeadersParser, in: []const u8) CheckCompleteHeadError!u32 {
if (hp.state.isContent()) return 0;
const i = hp.findHeadersEnd(in);
const data = in[0..i];
if (hp.header_bytes_len + data.len > hp.header_bytes_buffer.len)
return error.HttpHeadersOversize;
@memcpy(hp.header_bytes_buffer[hp.header_bytes_len..][0..data.len], data);
hp.header_bytes_len += @intCast(data.len);
return i;
}
pub const ReadError = error{
HttpChunkInvalid,
};
/// Reads the body of the message into `buffer`. Returns the number of
/// bytes placed in the buffer.
///
/// If `skip` is true, the buffer will be unused and the body will be skipped.
///
/// See `std.http.Client.Connection for an example of `conn`.
pub fn read(r: *HeadersParser, conn: anytype, buffer: []u8, skip: bool) !usize {
assert(r.state.isContent());
if (r.done) return 0;
var out_index: usize = 0;
while (true) {
switch (r.state) {
.invalid, .start, .seen_n, .seen_r, .seen_rn, .seen_rnr => unreachable,
.finished => {
const data_avail = r.next_chunk_length;
if (skip) {
conn.fill() catch |err| switch (err) {
error.EndOfStream => {
r.done = true;
return 0;
},
else => |e| return e,
};
const nread = @min(conn.peek().len, data_avail);
conn.drop(@intCast(nread));
r.next_chunk_length -= nread;
if (r.next_chunk_length == 0 or nread == 0) r.done = true;
return out_index;
} else if (out_index < buffer.len) {
const out_avail = buffer.len - out_index;
const can_read = @as(usize, @intCast(@min(data_avail, out_avail)));
const nread = try conn.read(buffer[0..can_read]);
r.next_chunk_length -= nread;
if (r.next_chunk_length == 0 or nread == 0) r.done = true;
return nread;
} else {
return out_index;
}
},
.chunk_data_suffix, .chunk_data_suffix_r, .chunk_head_size, .chunk_head_ext, .chunk_head_r => {
conn.fill() catch |err| switch (err) {
error.EndOfStream => {
r.done = true;
return 0;
},
else => |e| return e,
};
const i = r.findChunkedLen(conn.peek());
conn.drop(@intCast(i));
switch (r.state) {
.invalid => return error.HttpChunkInvalid,
.chunk_data => if (r.next_chunk_length == 0) {
if (std.mem.eql(u8, conn.peek(), "\r\n")) {
r.state = .finished;
conn.drop(2);
} else {
// The trailer section is formatted identically
// to the header section.
r.state = .seen_rn;
}
r.done = true;
return out_index;
},
else => return out_index,
}
continue;
},
.chunk_data => {
const data_avail = r.next_chunk_length;
const out_avail = buffer.len - out_index;
if (skip) {
conn.fill() catch |err| switch (err) {
error.EndOfStream => {
r.done = true;
return 0;
},
else => |e| return e,
};
const nread = @min(conn.peek().len, data_avail);
conn.drop(@intCast(nread));
r.next_chunk_length -= nread;
} else if (out_avail > 0) {
const can_read: usize = @intCast(@min(data_avail, out_avail));
const nread = try conn.read(buffer[out_index..][0..can_read]);
r.next_chunk_length -= nread;
out_index += nread;
}
if (r.next_chunk_length == 0) {
r.state = .chunk_data_suffix;
continue;
}
return out_index;
},
}
}
}
};
inline fn int16(array: *const [2]u8) u16 {
return @as(u16, @bitCast(array.*));
}
inline fn int24(array: *const [3]u8) u24 {
return @as(u24, @bitCast(array.*));
}
inline fn int32(array: *const [4]u8) u32 {
return @as(u32, @bitCast(array.*));
}
inline fn intShift(comptime T: type, x: anytype) T {
switch (@import("builtin").cpu.arch.endian()) {
.little => return @as(T, @truncate(x >> (@bitSizeOf(@TypeOf(x)) - @bitSizeOf(T)))),
.big => return @as(T, @truncate(x)),
}
}
/// A buffered (and peekable) Connection.
const MockBufferedConnection = struct {
pub const buffer_size = 0x2000;
conn: std.io.FixedBufferStream([]const u8),
buf: [buffer_size]u8 = undefined,
start: u16 = 0,
end: u16 = 0,
pub fn fill(conn: *MockBufferedConnection) ReadError!void {
if (conn.end != conn.start) return;
const nread = try conn.conn.read(conn.buf[0..]);
if (nread == 0) return error.EndOfStream;
conn.start = 0;
conn.end = @as(u16, @truncate(nread));
}
pub fn peek(conn: *MockBufferedConnection) []const u8 {
return conn.buf[conn.start..conn.end];
}
pub fn drop(conn: *MockBufferedConnection, num: u16) void {
conn.start += num;
}
pub fn readAtLeast(conn: *MockBufferedConnection, buffer: []u8, len: usize) ReadError!usize {
var out_index: u16 = 0;
while (out_index < len) {
const available = conn.end - conn.start;
const left = buffer.len - out_index;
if (available > 0) {
const can_read = @as(u16, @truncate(@min(available, left)));
@memcpy(buffer[out_index..][0..can_read], conn.buf[conn.start..][0..can_read]);
out_index += can_read;
conn.start += can_read;
continue;
}
if (left > conn.buf.len) {
// skip the buffer if the output is large enough
return conn.conn.read(buffer[out_index..]);
}
try conn.fill();
}
return out_index;
}
pub fn read(conn: *MockBufferedConnection, buffer: []u8) ReadError!usize {
return conn.readAtLeast(buffer, 1);
}
pub const ReadError = std.io.FixedBufferStream([]const u8).ReadError || error{EndOfStream};
pub const Reader = std.io.GenericReader(*MockBufferedConnection, ReadError, read);
pub fn reader(conn: *MockBufferedConnection) Reader {
return Reader{ .context = conn };
}
pub fn writeAll(conn: *MockBufferedConnection, buffer: []const u8) WriteError!void {
return conn.conn.writeAll(buffer);
}
pub fn write(conn: *MockBufferedConnection, buffer: []const u8) WriteError!usize {
return conn.conn.write(buffer);
}
pub const WriteError = std.io.FixedBufferStream([]const u8).WriteError;
pub const Writer = std.io.GenericWriter(*MockBufferedConnection, WriteError, write);
pub fn writer(conn: *MockBufferedConnection) Writer {
return Writer{ .context = conn };
}
};
test "HeadersParser.read length" {
// mock BufferedConnection for read
var headers_buf: [256]u8 = undefined;
var r = HeadersParser.init(&headers_buf);
const data = "GET / HTTP/1.1\r\nHost: localhost\r\nContent-Length: 5\r\n\r\nHello";
var conn: MockBufferedConnection = .{
.conn = std.io.fixedBufferStream(data),
};
while (true) { // read headers
try conn.fill();
const nchecked = try r.checkCompleteHead(conn.peek());
conn.drop(@intCast(nchecked));
if (r.state.isContent()) break;
}
var buf: [8]u8 = undefined;
r.next_chunk_length = 5;
const len = try r.read(&conn, &buf, false);
try std.testing.expectEqual(@as(usize, 5), len);
try std.testing.expectEqualStrings("Hello", buf[0..len]);
try std.testing.expectEqualStrings("GET / HTTP/1.1\r\nHost: localhost\r\nContent-Length: 5\r\n\r\n", r.get());
}
test "HeadersParser.read chunked" {
// mock BufferedConnection for read
var headers_buf: [256]u8 = undefined;
var r = HeadersParser.init(&headers_buf);
const data = "GET / HTTP/1.1\r\nHost: localhost\r\n\r\n2\r\nHe\r\n2\r\nll\r\n1\r\no\r\n0\r\n\r\n";
var conn: MockBufferedConnection = .{
.conn = std.io.fixedBufferStream(data),
};
while (true) { // read headers
try conn.fill();
const nchecked = try r.checkCompleteHead(conn.peek());
conn.drop(@intCast(nchecked));
if (r.state.isContent()) break;
}
var buf: [8]u8 = undefined;
r.state = .chunk_head_size;
const len = try r.read(&conn, &buf, false);
try std.testing.expectEqual(@as(usize, 5), len);
try std.testing.expectEqualStrings("Hello", buf[0..len]);
try std.testing.expectEqualStrings("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n", r.get());
}
test "HeadersParser.read chunked trailer" {
// mock BufferedConnection for read
var headers_buf: [256]u8 = undefined;
var r = HeadersParser.init(&headers_buf);
const data = "GET / HTTP/1.1\r\nHost: localhost\r\n\r\n2\r\nHe\r\n2\r\nll\r\n1\r\no\r\n0\r\nContent-Type: text/plain\r\n\r\n";
var conn: MockBufferedConnection = .{
.conn = std.io.fixedBufferStream(data),
};
while (true) { // read headers
try conn.fill();
const nchecked = try r.checkCompleteHead(conn.peek());
conn.drop(@intCast(nchecked));
if (r.state.isContent()) break;
}
var buf: [8]u8 = undefined;
r.state = .chunk_head_size;
const len = try r.read(&conn, &buf, false);
try std.testing.expectEqual(@as(usize, 5), len);
try std.testing.expectEqualStrings("Hello", buf[0..len]);
while (true) { // read headers
try conn.fill();
const nchecked = try r.checkCompleteHead(conn.peek());
conn.drop(@intCast(nchecked));
if (r.state.isContent()) break;
}
try std.testing.expectEqualStrings("GET / HTTP/1.1\r\nHost: localhost\r\n\r\nContent-Type: text/plain\r\n\r\n", r.get());
}

File diff suppressed because it is too large Load diff