std: start moving fs.File to Io

This commit is contained in:
Andrew Kelley 2025-09-05 17:30:07 -07:00
parent bd3c65f752
commit e7729a7b89
8 changed files with 924 additions and 313 deletions

View file

@ -6,7 +6,6 @@ const windows = std.os.windows;
const posix = std.posix; const posix = std.posix;
const math = std.math; const math = std.math;
const assert = std.debug.assert; const assert = std.debug.assert;
const fs = std.fs;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const Alignment = std.mem.Alignment; const Alignment = std.mem.Alignment;
@ -650,10 +649,15 @@ pub const VTable = struct {
conditionWake: *const fn (?*anyopaque, cond: *Condition, wake: Condition.Wake) void, conditionWake: *const fn (?*anyopaque, cond: *Condition, wake: Condition.Wake) void,
createFile: *const fn (?*anyopaque, dir: Dir, sub_path: []const u8, flags: File.CreateFlags) File.OpenError!File, createFile: *const fn (?*anyopaque, dir: Dir, sub_path: []const u8, flags: File.CreateFlags) File.OpenError!File,
openFile: *const fn (?*anyopaque, dir: Dir, sub_path: []const u8, flags: File.OpenFlags) File.OpenError!File, fileOpen: *const fn (?*anyopaque, dir: Dir, sub_path: []const u8, flags: File.OpenFlags) File.OpenError!File,
closeFile: *const fn (?*anyopaque, File) void, fileClose: *const fn (?*anyopaque, File) void,
pread: *const fn (?*anyopaque, file: File, buffer: []u8, offset: std.posix.off_t) File.PReadError!usize,
pwrite: *const fn (?*anyopaque, file: File, buffer: []const u8, offset: std.posix.off_t) File.PWriteError!usize, pwrite: *const fn (?*anyopaque, file: File, buffer: []const u8, offset: std.posix.off_t) File.PWriteError!usize,
/// Returns 0 on end of stream.
fileReadStreaming: *const fn (?*anyopaque, file: File, data: [][]u8) File.ReadStreamingError!usize,
/// Returns 0 on end of stream.
fileReadPositional: *const fn (?*anyopaque, file: File, data: [][]u8, offset: u64) File.ReadPositionalError!usize,
fileSeekBy: *const fn (?*anyopaque, file: File, offset: i64) File.SeekError!void,
fileSeekTo: *const fn (?*anyopaque, file: File, offset: u64) File.SeekError!void,
now: *const fn (?*anyopaque, clockid: std.posix.clockid_t) ClockGetTimeError!Timestamp, now: *const fn (?*anyopaque, clockid: std.posix.clockid_t) ClockGetTimeError!Timestamp,
sleep: *const fn (?*anyopaque, clockid: std.posix.clockid_t, deadline: Deadline) SleepError!void, sleep: *const fn (?*anyopaque, clockid: std.posix.clockid_t, deadline: Deadline) SleepError!void,
@ -670,6 +674,18 @@ pub const Cancelable = error{
Canceled, Canceled,
}; };
pub const UnexpectedError = error{
/// The Operating System returned an undocumented error code.
///
/// This error is in theory not possible, but it would be better
/// to handle this error than to invoke undefined behavior.
///
/// When this error code is observed, it usually means the Zig Standard
/// Library needs a small patch to add the error code to the error set for
/// the respective function.
Unexpected,
};
pub const Dir = struct { pub const Dir = struct {
handle: Handle, handle: Handle,
@ -680,7 +696,7 @@ pub const Dir = struct {
pub const Handle = std.posix.fd_t; pub const Handle = std.posix.fd_t;
pub fn openFile(dir: Dir, io: Io, sub_path: []const u8, flags: File.OpenFlags) File.OpenError!File { pub fn openFile(dir: Dir, io: Io, sub_path: []const u8, flags: File.OpenFlags) File.OpenError!File {
return io.vtable.openFile(io.userdata, dir, sub_path, flags); return io.vtable.fileOpen(io.userdata, dir, sub_path, flags);
} }
pub fn createFile(dir: Dir, io: Io, sub_path: []const u8, flags: File.CreateFlags) File.OpenError!File { pub fn createFile(dir: Dir, io: Io, sub_path: []const u8, flags: File.CreateFlags) File.OpenError!File {
@ -706,66 +722,7 @@ pub const Dir = struct {
} }
}; };
pub const File = struct { pub const File = @import("Io/File.zig");
handle: Handle,
pub const Handle = std.posix.fd_t;
pub const OpenFlags = fs.File.OpenFlags;
pub const CreateFlags = fs.File.CreateFlags;
pub const OpenError = fs.File.OpenError || Cancelable;
pub fn close(file: File, io: Io) void {
return io.vtable.closeFile(io.userdata, file);
}
pub const ReadError = fs.File.ReadError || Cancelable;
pub fn read(file: File, io: Io, buffer: []u8) ReadError!usize {
return @errorCast(file.pread(io, buffer, -1));
}
pub const PReadError = fs.File.PReadError || Cancelable;
pub fn pread(file: File, io: Io, buffer: []u8, offset: std.posix.off_t) PReadError!usize {
return io.vtable.pread(io.userdata, file, buffer, offset);
}
pub const WriteError = fs.File.WriteError || Cancelable;
pub fn write(file: File, io: Io, buffer: []const u8) WriteError!usize {
return @errorCast(file.pwrite(io, buffer, -1));
}
pub const PWriteError = fs.File.PWriteError || Cancelable;
pub fn pwrite(file: File, io: Io, buffer: []const u8, offset: std.posix.off_t) PWriteError!usize {
return io.vtable.pwrite(io.userdata, file, buffer, offset);
}
pub fn writeAll(file: File, io: Io, bytes: []const u8) WriteError!void {
var index: usize = 0;
while (index < bytes.len) {
index += try file.write(io, bytes[index..]);
}
}
pub fn readAll(file: File, io: Io, buffer: []u8) ReadError!usize {
var index: usize = 0;
while (index != buffer.len) {
const amt = try file.read(io, buffer[index..]);
if (amt == 0) break;
index += amt;
}
return index;
}
pub fn openAbsolute(io: Io, absolute_path: []const u8, flags: OpenFlags) OpenError {
assert(std.fs.path.isAbsolute(absolute_path));
return Dir.cwd().openFile(io, absolute_path, flags);
}
};
pub const Timestamp = enum(i96) { pub const Timestamp = enum(i96) {
_, _,

View file

@ -93,7 +93,7 @@ const Fiber = struct {
} }
fn resultPointer(f: *Fiber, comptime Result: type) *Result { fn resultPointer(f: *Fiber, comptime Result: type) *Result {
return @alignCast(@ptrCast(f.resultBytes(.of(Result)))); return @ptrCast(@alignCast(f.resultBytes(.of(Result))));
} }
fn resultBytes(f: *Fiber, alignment: Alignment) [*]u8 { fn resultBytes(f: *Fiber, alignment: Alignment) [*]u8 {
@ -153,8 +153,8 @@ pub fn io(el: *EventLoop) Io {
.conditionWake = conditionWake, .conditionWake = conditionWake,
.createFile = createFile, .createFile = createFile,
.openFile = openFile, .fileOpen = fileOpen,
.closeFile = closeFile, .fileClose = fileClose,
.pread = pread, .pread = pread,
.pwrite = pwrite, .pwrite = pwrite,
@ -193,7 +193,7 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
}; };
const main_thread = &el.threads.allocated[0]; const main_thread = &el.threads.allocated[0];
Thread.self = main_thread; Thread.self = main_thread;
const idle_stack_end: [*]align(16) usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr)); const idle_stack_end: [*]align(16) usize = @ptrCast(@alignCast(allocated_slice[idle_stack_end_offset..].ptr));
(idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)}; (idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)};
main_thread.* = .{ main_thread.* = .{
.thread = undefined, .thread = undefined,
@ -244,7 +244,7 @@ pub fn deinit(el: *EventLoop) void {
assert(ready_fiber == null or ready_fiber == Fiber.finished); // pending async assert(ready_fiber == null or ready_fiber == Fiber.finished); // pending async
} }
el.yield(null, .exit); el.yield(null, .exit);
const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @alignCast(@ptrCast(el.threads.allocated.ptr)); const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @ptrCast(@alignCast(el.threads.allocated.ptr));
const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max); const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max);
for (el.threads.allocated[1..active_threads]) |*thread| thread.thread.join(); for (el.threads.allocated[1..active_threads]) |*thread| thread.thread.join();
el.gpa.free(allocated_ptr[0..idle_stack_end_offset]); el.gpa.free(allocated_ptr[0..idle_stack_end_offset]);
@ -530,7 +530,7 @@ const SwitchMessage = struct {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
assert(prev_fiber.queue_next == null); assert(prev_fiber.queue_next == null);
for (futures) |any_future| { for (futures) |any_future| {
const future_fiber: *Fiber = @alignCast(@ptrCast(any_future)); const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
if (@atomicRmw(?*Fiber, &future_fiber.awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) { if (@atomicRmw(?*Fiber, &future_fiber.awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) {
const closure: *AsyncClosure = .fromFiber(future_fiber); const closure: *AsyncClosure = .fromFiber(future_fiber);
if (!@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .seq_cst)) { if (!@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .seq_cst)) {
@ -897,12 +897,12 @@ fn asyncConcurrent(
assert(result_len <= Fiber.max_result_size); // TODO assert(result_len <= Fiber.max_result_size); // TODO
assert(context.len <= Fiber.max_context_size); // TODO assert(context.len <= Fiber.max_context_size); // TODO
const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); const event_loop: *EventLoop = @ptrCast(@alignCast(userdata));
const fiber = try Fiber.allocate(event_loop); const fiber = try Fiber.allocate(event_loop);
std.log.debug("allocated {*}", .{fiber}); std.log.debug("allocated {*}", .{fiber});
const closure: *AsyncClosure = .fromFiber(fiber); const closure: *AsyncClosure = .fromFiber(fiber);
const stack_end: [*]align(16) usize = @alignCast(@ptrCast(closure)); const stack_end: [*]align(16) usize = @ptrCast(@alignCast(closure));
(stack_end - 1)[0..1].* = .{@intFromPtr(&AsyncClosure.call)}; (stack_end - 1)[0..1].* = .{@intFromPtr(&AsyncClosure.call)};
fiber.* = .{ fiber.* = .{
.required_align = {}, .required_align = {},
@ -974,7 +974,7 @@ fn asyncDetached(
assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
assert(context.len <= Fiber.max_context_size); // TODO assert(context.len <= Fiber.max_context_size); // TODO
const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); const event_loop: *EventLoop = @ptrCast(@alignCast(userdata));
const fiber = Fiber.allocate(event_loop) catch { const fiber = Fiber.allocate(event_loop) catch {
start(context.ptr); start(context.ptr);
return; return;
@ -985,7 +985,7 @@ fn asyncDetached(
const closure: *DetachedClosure = @ptrFromInt(Fiber.max_context_align.max(.of(DetachedClosure)).backward( const closure: *DetachedClosure = @ptrFromInt(Fiber.max_context_align.max(.of(DetachedClosure)).backward(
@intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size, @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size,
) - @sizeOf(DetachedClosure)); ) - @sizeOf(DetachedClosure));
const stack_end: [*]align(16) usize = @alignCast(@ptrCast(closure)); const stack_end: [*]align(16) usize = @ptrCast(@alignCast(closure));
(stack_end - 1)[0..1].* = .{@intFromPtr(&DetachedClosure.call)}; (stack_end - 1)[0..1].* = .{@intFromPtr(&DetachedClosure.call)};
fiber.* = .{ fiber.* = .{
.required_align = {}, .required_align = {},
@ -1035,8 +1035,8 @@ fn await(
result: []u8, result: []u8,
result_alignment: Alignment, result_alignment: Alignment,
) void { ) void {
const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); const event_loop: *EventLoop = @ptrCast(@alignCast(userdata));
const future_fiber: *Fiber = @alignCast(@ptrCast(any_future)); const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished)
event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter }); event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
@memcpy(result, future_fiber.resultBytes(result_alignment)); @memcpy(result, future_fiber.resultBytes(result_alignment));
@ -1044,11 +1044,11 @@ fn await(
} }
fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize { fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize {
const el: *EventLoop = @alignCast(@ptrCast(userdata)); const el: *EventLoop = @ptrCast(@alignCast(userdata));
// Optimization to avoid the yield below. // Optimization to avoid the yield below.
for (futures, 0..) |any_future, i| { for (futures, 0..) |any_future, i| {
const future_fiber: *Fiber = @alignCast(@ptrCast(any_future)); const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) == Fiber.finished) if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) == Fiber.finished)
return i; return i;
} }
@ -1062,7 +1062,7 @@ fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize {
var result: ?usize = null; var result: ?usize = null;
for (futures, 0..) |any_future, i| { for (futures, 0..) |any_future, i| {
const future_fiber: *Fiber = @alignCast(@ptrCast(any_future)); const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
if (@cmpxchgStrong(?*Fiber, &future_fiber.awaiter, my_fiber, null, .seq_cst, .seq_cst)) |awaiter| { if (@cmpxchgStrong(?*Fiber, &future_fiber.awaiter, my_fiber, null, .seq_cst, .seq_cst)) |awaiter| {
if (awaiter == Fiber.finished) { if (awaiter == Fiber.finished) {
if (result == null) result = i; if (result == null) result = i;
@ -1085,7 +1085,7 @@ fn cancel(
result: []u8, result: []u8,
result_alignment: Alignment, result_alignment: Alignment,
) void { ) void {
const future_fiber: *Fiber = @alignCast(@ptrCast(any_future)); const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
if (@atomicRmw( if (@atomicRmw(
?*Thread, ?*Thread,
&future_fiber.cancel_thread, &future_fiber.cancel_thread,
@ -1124,7 +1124,7 @@ fn createFile(
sub_path: []const u8, sub_path: []const u8,
flags: Io.File.CreateFlags, flags: Io.File.CreateFlags,
) Io.File.OpenError!Io.File { ) Io.File.OpenError!Io.File {
const el: *EventLoop = @alignCast(@ptrCast(userdata)); const el: *EventLoop = @ptrCast(@alignCast(userdata));
const thread: *Thread = .current(); const thread: *Thread = .current();
const iou = &thread.io_uring; const iou = &thread.io_uring;
const fiber = thread.currentFiber(); const fiber = thread.currentFiber();
@ -1220,13 +1220,13 @@ fn createFile(
} }
} }
fn openFile( fn fileOpen(
userdata: ?*anyopaque, userdata: ?*anyopaque,
dir: Io.Dir, dir: Io.Dir,
sub_path: []const u8, sub_path: []const u8,
flags: Io.File.OpenFlags, flags: Io.File.OpenFlags,
) Io.File.OpenError!Io.File { ) Io.File.OpenError!Io.File {
const el: *EventLoop = @alignCast(@ptrCast(userdata)); const el: *EventLoop = @ptrCast(@alignCast(userdata));
const thread: *Thread = .current(); const thread: *Thread = .current();
const iou = &thread.io_uring; const iou = &thread.io_uring;
const fiber = thread.currentFiber(); const fiber = thread.currentFiber();
@ -1328,8 +1328,8 @@ fn openFile(
} }
} }
fn closeFile(userdata: ?*anyopaque, file: Io.File) void { fn fileClose(userdata: ?*anyopaque, file: Io.File) void {
const el: *EventLoop = @alignCast(@ptrCast(userdata)); const el: *EventLoop = @ptrCast(@alignCast(userdata));
const thread: *Thread = .current(); const thread: *Thread = .current();
const iou = &thread.io_uring; const iou = &thread.io_uring;
const fiber = thread.currentFiber(); const fiber = thread.currentFiber();
@ -1365,7 +1365,7 @@ fn closeFile(userdata: ?*anyopaque, file: Io.File) void {
} }
fn pread(userdata: ?*anyopaque, file: Io.File, buffer: []u8, offset: std.posix.off_t) Io.File.PReadError!usize { fn pread(userdata: ?*anyopaque, file: Io.File, buffer: []u8, offset: std.posix.off_t) Io.File.PReadError!usize {
const el: *EventLoop = @alignCast(@ptrCast(userdata)); const el: *EventLoop = @ptrCast(@alignCast(userdata));
const thread: *Thread = .current(); const thread: *Thread = .current();
const iou = &thread.io_uring; const iou = &thread.io_uring;
const fiber = thread.currentFiber(); const fiber = thread.currentFiber();
@ -1417,7 +1417,7 @@ fn pread(userdata: ?*anyopaque, file: Io.File, buffer: []u8, offset: std.posix.o
} }
fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: std.posix.off_t) Io.File.PWriteError!usize { fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: std.posix.off_t) Io.File.PWriteError!usize {
const el: *EventLoop = @alignCast(@ptrCast(userdata)); const el: *EventLoop = @ptrCast(@alignCast(userdata));
const thread: *Thread = .current(); const thread: *Thread = .current();
const iou = &thread.io_uring; const iou = &thread.io_uring;
const fiber = thread.currentFiber(); const fiber = thread.currentFiber();
@ -1479,7 +1479,7 @@ fn now(userdata: ?*anyopaque, clockid: std.posix.clockid_t) Io.ClockGetTimeError
} }
fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void { fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void {
const el: *EventLoop = @alignCast(@ptrCast(userdata)); const el: *EventLoop = @ptrCast(@alignCast(userdata));
const thread: *Thread = .current(); const thread: *Thread = .current();
const iou = &thread.io_uring; const iou = &thread.io_uring;
const fiber = thread.currentFiber(); const fiber = thread.currentFiber();
@ -1532,7 +1532,7 @@ fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadl
} }
fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) error{Canceled}!void { fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) error{Canceled}!void {
const el: *EventLoop = @alignCast(@ptrCast(userdata)); const el: *EventLoop = @ptrCast(@alignCast(userdata));
el.yield(null, .{ .mutex_lock = .{ .prev_state = prev_state, .mutex = mutex } }); el.yield(null, .{ .mutex_lock = .{ .prev_state = prev_state, .mutex = mutex } });
} }
fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void { fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
@ -1553,7 +1553,7 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut
.acquire, .acquire,
) orelse return) |next_state| maybe_waiting_fiber = @ptrFromInt(@intFromEnum(next_state)); ) orelse return) |next_state| maybe_waiting_fiber = @ptrFromInt(@intFromEnum(next_state));
maybe_waiting_fiber.?.queue_next = null; maybe_waiting_fiber.?.queue_next = null;
const el: *EventLoop = @alignCast(@ptrCast(userdata)); const el: *EventLoop = @ptrCast(@alignCast(userdata));
el.yield(maybe_waiting_fiber.?, .reschedule); el.yield(maybe_waiting_fiber.?, .reschedule);
} }
@ -1566,7 +1566,7 @@ const ConditionImpl = struct {
}; };
fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void { fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
const el: *EventLoop = @alignCast(@ptrCast(userdata)); const el: *EventLoop = @ptrCast(@alignCast(userdata));
el.yield(null, .{ .condition_wait = .{ .cond = cond, .mutex = mutex } }); el.yield(null, .{ .condition_wait = .{ .cond = cond, .mutex = mutex } });
const thread = Thread.current(); const thread = Thread.current();
const fiber = thread.currentFiber(); const fiber = thread.currentFiber();
@ -1595,7 +1595,7 @@ fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) I
} }
fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void { fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void {
const el: *EventLoop = @alignCast(@ptrCast(userdata)); const el: *EventLoop = @ptrCast(@alignCast(userdata));
const waiting_fiber = @atomicRmw(?*Fiber, @as(*?*Fiber, @ptrCast(&cond.state)), .Xchg, null, .acquire) orelse return; const waiting_fiber = @atomicRmw(?*Fiber, @as(*?*Fiber, @ptrCast(&cond.state)), .Xchg, null, .acquire) orelse return;
waiting_fiber.resultPointer(ConditionImpl).event = .{ .wake = wake }; waiting_fiber.resultPointer(ConditionImpl).event = .{ .wake = wake };
el.yield(waiting_fiber, .reschedule); el.yield(waiting_fiber, .reschedule);

550
lib/std/Io/File.zig Normal file
View file

@ -0,0 +1,550 @@
const builtin = @import("builtin");
const std = @import("../std.zig");
const Io = std.Io;
const File = @This();
const assert = std.debug.assert;
handle: Handle,
pub const Handle = std.posix.fd_t;
pub const Mode = std.posix.mode_t;
pub const INode = std.posix.ino_t;
pub const Kind = enum {
block_device,
character_device,
directory,
named_pipe,
sym_link,
file,
unix_domain_socket,
whiteout,
door,
event_port,
unknown,
};
pub const Stat = struct {
/// A number that the system uses to point to the file metadata. This
/// number is not guaranteed to be unique across time, as some file
/// systems may reuse an inode after its file has been deleted. Some
/// systems may change the inode of a file over time.
///
/// On Linux, the inode is a structure that stores the metadata, and
/// the inode _number_ is what you see here: the index number of the
/// inode.
///
/// The FileIndex on Windows is similar. It is a number for a file that
/// is unique to each filesystem.
inode: INode,
size: u64,
/// This is available on POSIX systems and is always 0 otherwise.
mode: Mode,
kind: Kind,
/// Last access time in nanoseconds, relative to UTC 1970-01-01.
atime: i128,
/// Last modification time in nanoseconds, relative to UTC 1970-01-01.
mtime: i128,
/// Last status/metadata change time in nanoseconds, relative to UTC 1970-01-01.
ctime: i128,
pub fn fromPosix(st: std.posix.Stat) Stat {
const atime = st.atime();
const mtime = st.mtime();
const ctime = st.ctime();
return .{
.inode = st.ino,
.size = @bitCast(st.size),
.mode = st.mode,
.kind = k: {
const m = st.mode & std.posix.S.IFMT;
switch (m) {
std.posix.S.IFBLK => break :k .block_device,
std.posix.S.IFCHR => break :k .character_device,
std.posix.S.IFDIR => break :k .directory,
std.posix.S.IFIFO => break :k .named_pipe,
std.posix.S.IFLNK => break :k .sym_link,
std.posix.S.IFREG => break :k .file,
std.posix.S.IFSOCK => break :k .unix_domain_socket,
else => {},
}
if (builtin.os.tag == .illumos) switch (m) {
std.posix.S.IFDOOR => break :k .door,
std.posix.S.IFPORT => break :k .event_port,
else => {},
};
break :k .unknown;
},
.atime = @as(i128, atime.sec) * std.time.ns_per_s + atime.nsec,
.mtime = @as(i128, mtime.sec) * std.time.ns_per_s + mtime.nsec,
.ctime = @as(i128, ctime.sec) * std.time.ns_per_s + ctime.nsec,
};
}
pub fn fromLinux(stx: std.os.linux.Statx) Stat {
const atime = stx.atime;
const mtime = stx.mtime;
const ctime = stx.ctime;
return .{
.inode = stx.ino,
.size = stx.size,
.mode = stx.mode,
.kind = switch (stx.mode & std.os.linux.S.IFMT) {
std.os.linux.S.IFDIR => .directory,
std.os.linux.S.IFCHR => .character_device,
std.os.linux.S.IFBLK => .block_device,
std.os.linux.S.IFREG => .file,
std.os.linux.S.IFIFO => .named_pipe,
std.os.linux.S.IFLNK => .sym_link,
std.os.linux.S.IFSOCK => .unix_domain_socket,
else => .unknown,
},
.atime = @as(i128, atime.sec) * std.time.ns_per_s + atime.nsec,
.mtime = @as(i128, mtime.sec) * std.time.ns_per_s + mtime.nsec,
.ctime = @as(i128, ctime.sec) * std.time.ns_per_s + ctime.nsec,
};
}
pub fn fromWasi(st: std.os.wasi.filestat_t) Stat {
return .{
.inode = st.ino,
.size = @bitCast(st.size),
.mode = 0,
.kind = switch (st.filetype) {
.BLOCK_DEVICE => .block_device,
.CHARACTER_DEVICE => .character_device,
.DIRECTORY => .directory,
.SYMBOLIC_LINK => .sym_link,
.REGULAR_FILE => .file,
.SOCKET_STREAM, .SOCKET_DGRAM => .unix_domain_socket,
else => .unknown,
},
.atime = st.atim,
.mtime = st.mtim,
.ctime = st.ctim,
};
}
};
pub const StatError = std.posix.FStatError || Io.Cancelable;
/// Returns `Stat` containing basic information about the `File`.
pub fn stat(file: File, io: Io) StatError!Stat {
_ = file;
_ = io;
@panic("TODO");
}
pub const OpenFlags = std.fs.File.OpenFlags;
pub const CreateFlags = std.fs.File.CreateFlags;
pub const OpenError = std.fs.File.OpenError || Io.Cancelable;
pub fn close(file: File, io: Io) void {
return io.vtable.fileClose(io.userdata, file);
}
pub const ReadStreamingError = error{
InputOutput,
SystemResources,
IsDir,
BrokenPipe,
ConnectionResetByPeer,
ConnectionTimedOut,
NotOpenForReading,
SocketNotConnected,
/// This error occurs when no global event loop is configured,
/// and reading from the file descriptor would block.
WouldBlock,
/// In WASI, this error occurs when the file descriptor does
/// not hold the required rights to read from it.
AccessDenied,
/// This error occurs in Linux if the process to be read from
/// no longer exists.
ProcessNotFound,
/// Unable to read file due to lock.
LockViolation,
} || Io.Cancelable || Io.UnexpectedError;
pub const ReadPositionalError = ReadStreamingError || error{Unseekable};
pub fn readPositional(file: File, io: Io, buffer: []u8, offset: u64) ReadPositionalError!usize {
return io.vtable.pread(io.userdata, file, buffer, offset);
}
pub const WriteError = std.fs.File.WriteError || Io.Cancelable;
pub fn write(file: File, io: Io, buffer: []const u8) WriteError!usize {
return @errorCast(file.pwrite(io, buffer, -1));
}
pub const PWriteError = std.fs.File.PWriteError || Io.Cancelable;
pub fn pwrite(file: File, io: Io, buffer: []const u8, offset: std.posix.off_t) PWriteError!usize {
return io.vtable.pwrite(io.userdata, file, buffer, offset);
}
pub fn openAbsolute(io: Io, absolute_path: []const u8, flags: OpenFlags) OpenError!File {
assert(std.fs.path.isAbsolute(absolute_path));
return Io.Dir.cwd().openFile(io, absolute_path, flags);
}
/// Defaults to positional reading; falls back to streaming.
///
/// Positional is more threadsafe, since the global seek position is not
/// affected.
pub fn reader(file: File, io: Io, buffer: []u8) Reader {
return .init(file, io, buffer);
}
/// Positional is more threadsafe, since the global seek position is not
/// affected, but when such syscalls are not available, preemptively
/// initializing in streaming mode skips a failed syscall.
pub fn readerStreaming(file: File, io: Io, buffer: []u8) Reader {
return .initStreaming(file, io, buffer);
}
pub const SeekError = error{
Unseekable,
/// The file descriptor does not hold the required rights to seek on it.
AccessDenied,
} || Io.Cancelable || Io.UnexpectedError;
/// Memoizes key information about a file handle such as:
/// * The size from calling stat, or the error that occurred therein.
/// * The current seek position.
/// * The error that occurred when trying to seek.
/// * Whether reading should be done positionally or streaming.
/// * Whether reading should be done via fd-to-fd syscalls (e.g. `sendfile`)
/// versus plain variants (e.g. `read`).
///
/// Fulfills the `Io.Reader` interface.
pub const Reader = struct {
io: Io,
file: File,
err: ?Error = null,
mode: Reader.Mode = .positional,
/// Tracks the true seek position in the file. To obtain the logical
/// position, use `logicalPos`.
pos: u64 = 0,
size: ?u64 = null,
size_err: ?SizeError = null,
seek_err: ?Reader.SeekError = null,
interface: Io.Reader,
pub const Error = std.posix.ReadError || Io.Cancelable;
pub const SizeError = std.os.windows.GetFileSizeError || StatError || error{
/// Occurs if, for example, the file handle is a network socket and therefore does not have a size.
Streaming,
};
pub const SeekError = File.SeekError || error{
/// Seeking fell back to reading, and reached the end before the requested seek position.
/// `pos` remains at the end of the file.
EndOfStream,
/// Seeking fell back to reading, which failed.
ReadFailed,
};
pub const Mode = enum {
streaming,
positional,
/// Avoid syscalls other than `read` and `readv`.
streaming_reading,
/// Avoid syscalls other than `pread` and `preadv`.
positional_reading,
/// Indicates reading cannot continue because of a seek failure.
failure,
pub fn toStreaming(m: @This()) @This() {
return switch (m) {
.positional, .streaming => .streaming,
.positional_reading, .streaming_reading => .streaming_reading,
.failure => .failure,
};
}
pub fn toReading(m: @This()) @This() {
return switch (m) {
.positional, .positional_reading => .positional_reading,
.streaming, .streaming_reading => .streaming_reading,
.failure => .failure,
};
}
};
pub fn initInterface(buffer: []u8) Io.Reader {
return .{
.vtable = &.{
.stream = Reader.stream,
.discard = Reader.discard,
.readVec = Reader.readVec,
},
.buffer = buffer,
.seek = 0,
.end = 0,
};
}
pub fn init(file: File, io: Io, buffer: []u8) Reader {
return .{
.io = io,
.file = file,
.interface = initInterface(buffer),
};
}
pub fn initSize(file: File, io: Io, buffer: []u8, size: ?u64) Reader {
return .{
.io = io,
.file = file,
.interface = initInterface(buffer),
.size = size,
};
}
/// Positional is more threadsafe, since the global seek position is not
/// affected, but when such syscalls are not available, preemptively
/// initializing in streaming mode skips a failed syscall.
pub fn initStreaming(file: File, io: Io, buffer: []u8) Reader {
return .{
.io = io,
.file = file,
.interface = Reader.initInterface(buffer),
.mode = .streaming,
.seek_err = error.Unseekable,
.size_err = error.Streaming,
};
}
pub fn getSize(r: *Reader) SizeError!u64 {
return r.size orelse {
if (r.size_err) |err| return err;
if (std.posix.Stat == void) {
r.size_err = error.Streaming;
return error.Streaming;
}
if (stat(r.file, r.io)) |st| {
if (st.kind == .file) {
r.size = st.size;
return st.size;
} else {
r.mode = r.mode.toStreaming();
r.size_err = error.Streaming;
return error.Streaming;
}
} else |err| {
r.size_err = err;
return err;
}
};
}
pub fn seekBy(r: *Reader, offset: i64) Reader.SeekError!void {
const io = r.io;
switch (r.mode) {
.positional, .positional_reading => {
setPosAdjustingBuffer(r, @intCast(@as(i64, @intCast(r.pos)) + offset));
},
.streaming, .streaming_reading => {
if (std.posix.SEEK == void) {
r.seek_err = error.Unseekable;
return error.Unseekable;
}
const seek_err = r.seek_err orelse e: {
if (io.vtable.fileSeekBy(io.userdata, r.file, offset)) |_| {
setPosAdjustingBuffer(r, @intCast(@as(i64, @intCast(r.pos)) + offset));
return;
} else |err| {
r.seek_err = err;
break :e err;
}
};
var remaining = std.math.cast(u64, offset) orelse return seek_err;
while (remaining > 0) {
remaining -= discard(&r.interface, .limited64(remaining)) catch |err| {
r.seek_err = err;
return err;
};
}
r.interface.seek = 0;
r.interface.end = 0;
},
.failure => return r.seek_err.?,
}
}
pub fn seekTo(r: *Reader, offset: u64) Reader.SeekError!void {
const io = r.io;
switch (r.mode) {
.positional, .positional_reading => {
setPosAdjustingBuffer(r, offset);
},
.streaming, .streaming_reading => {
if (offset >= r.pos) return Reader.seekBy(r, @intCast(offset - r.pos));
if (r.seek_err) |err| return err;
io.vtable.fileSeekTo(io.userdata, r.file, offset) catch |err| {
r.seek_err = err;
return err;
};
setPosAdjustingBuffer(r, offset);
},
.failure => return r.seek_err.?,
}
}
pub fn logicalPos(r: *const Reader) u64 {
return r.pos - r.interface.bufferedLen();
}
fn setPosAdjustingBuffer(r: *Reader, offset: u64) void {
const logical_pos = logicalPos(r);
if (offset < logical_pos or offset >= r.pos) {
r.interface.seek = 0;
r.interface.end = 0;
r.pos = offset;
} else {
const logical_delta: usize = @intCast(offset - logical_pos);
r.interface.seek += logical_delta;
}
}
/// Number of slices to store on the stack, when trying to send as many byte
/// vectors through the underlying read calls as possible.
const max_buffers_len = 16;
fn stream(io_reader: *Io.Reader, w: *Io.Writer, limit: Io.Limit) Io.Reader.StreamError!usize {
const r: *Reader = @alignCast(@fieldParentPtr("interface", io_reader));
switch (r.mode) {
.positional, .streaming => return w.sendFile(r, limit) catch |write_err| switch (write_err) {
error.Unimplemented => {
r.mode = r.mode.toReading();
return 0;
},
else => |e| return e,
},
.positional_reading => {
const dest = limit.slice(try w.writableSliceGreedy(1));
var data: [1][]u8 = .{dest};
const n = try readVecPositional(r, &data);
w.advance(n);
return n;
},
.streaming_reading => {
const dest = limit.slice(try w.writableSliceGreedy(1));
var data: [1][]u8 = .{dest};
const n = try readVecStreaming(r, &data);
w.advance(n);
return n;
},
.failure => return error.ReadFailed,
}
}
fn readVec(io_reader: *Io.Reader, data: [][]u8) Io.Reader.Error!usize {
const r: *Reader = @alignCast(@fieldParentPtr("interface", io_reader));
switch (r.mode) {
.positional, .positional_reading => return readVecPositional(r, data),
.streaming, .streaming_reading => return readVecStreaming(r, data),
.failure => return error.ReadFailed,
}
}
fn readVecPositional(r: *Reader, data: [][]u8) Io.Reader.Error!usize {
const io = r.io;
assert(r.interface.bufferedLen() == 0);
var iovecs_buffer: [max_buffers_len][]u8 = undefined;
const dest_n, const data_size = try r.interface.writableVector(&iovecs_buffer, data);
const dest = iovecs_buffer[0..dest_n];
assert(dest[0].len > 0);
const n = io.vtable.fileReadPositional(io.userdata, r.file, dest, r.pos) catch |err| switch (err) {
error.Unseekable => {
r.mode = r.mode.toStreaming();
const pos = r.pos;
if (pos != 0) {
r.pos = 0;
r.seekBy(@intCast(pos)) catch {
r.mode = .failure;
return error.ReadFailed;
};
}
return 0;
},
else => |e| {
r.err = e;
return error.ReadFailed;
},
};
if (n == 0) {
r.size = r.pos;
return error.EndOfStream;
}
r.pos += n;
if (n > data_size) {
r.interface.end += n - data_size;
return data_size;
}
return n;
}
fn readVecStreaming(r: *Reader, data: [][]u8) Io.Reader.Error!usize {
const io = r.io;
var iovecs_buffer: [max_buffers_len][]u8 = undefined;
const dest_n, const data_size = try r.interface.writableVector(&iovecs_buffer, data);
const dest = iovecs_buffer[0..dest_n];
assert(dest[0].len > 0);
const n = io.vtable.fileReadStreaming(io.userdata, r.file, dest) catch |err| {
r.err = err;
return error.ReadFailed;
};
if (n == 0) {
r.size = r.pos;
return error.EndOfStream;
}
r.pos += n;
if (n > data_size) {
r.interface.end += n - data_size;
return data_size;
}
return n;
}
fn discard(io_reader: *Io.Reader, limit: Io.Limit) Io.Reader.Error!usize {
const r: *Reader = @alignCast(@fieldParentPtr("interface", io_reader));
const io = r.io;
const file = r.file;
const pos = r.pos;
switch (r.mode) {
.positional, .positional_reading => {
const size = r.getSize() catch {
r.mode = r.mode.toStreaming();
return 0;
};
const delta = @min(@intFromEnum(limit), size - pos);
r.pos = pos + delta;
return delta;
},
.streaming, .streaming_reading => {
const size = r.getSize() catch return 0;
const n = @min(size - pos, std.math.maxInt(i64), @intFromEnum(limit));
io.vtable.fileSeekBy(io.userdata, file, n) catch |err| {
r.seek_err = err;
return 0;
};
r.pos = pos + n;
return n;
},
.failure => return error.ReadFailed,
}
}
pub fn atEnd(r: *Reader) bool {
// Even if stat fails, size is set when end is encountered.
const size = r.size orelse return false;
return size - r.pos == 0;
}
};

View file

@ -1,11 +1,16 @@
const Pool = @This();
const builtin = @import("builtin"); const builtin = @import("builtin");
const native_os = builtin.os.tag;
const is_windows = native_os == .windows;
const windows = std.os.windows;
const std = @import("../std.zig"); const std = @import("../std.zig");
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const assert = std.debug.assert; const assert = std.debug.assert;
const WaitGroup = std.Thread.WaitGroup; const WaitGroup = std.Thread.WaitGroup;
const posix = std.posix; const posix = std.posix;
const Io = std.Io; const Io = std.Io;
const Pool = @This();
/// Thread-safe. /// Thread-safe.
allocator: Allocator, allocator: Allocator,
@ -23,6 +28,10 @@ threadlocal var current_closure: ?*AsyncClosure = null;
const max_iovecs_len = 8; const max_iovecs_len = 8;
const splat_buffer_size = 64; const splat_buffer_size = 64;
comptime {
assert(max_iovecs_len <= posix.IOV_MAX);
}
pub const Runnable = struct { pub const Runnable = struct {
start: Start, start: Start,
node: std.SinglyLinkedList.Node = .{}, node: std.SinglyLinkedList.Node = .{},
@ -104,10 +113,13 @@ pub fn io(pool: *Pool) Io {
.conditionWake = conditionWake, .conditionWake = conditionWake,
.createFile = createFile, .createFile = createFile,
.openFile = openFile, .fileOpen = fileOpen,
.closeFile = closeFile, .fileClose = fileClose,
.pread = pread,
.pwrite = pwrite, .pwrite = pwrite,
.fileReadStreaming = fileReadStreaming,
.fileReadPositional = fileReadPositional,
.fileSeekBy = fileSeekBy,
.fileSeekTo = fileSeekTo,
.now = now, .now = now,
.sleep = sleep, .sleep = sleep,
@ -631,7 +643,7 @@ fn createFile(
return .{ .handle = fs_file.handle }; return .{ .handle = fs_file.handle };
} }
fn openFile( fn fileOpen(
userdata: ?*anyopaque, userdata: ?*anyopaque,
dir: Io.Dir, dir: Io.Dir,
sub_path: []const u8, sub_path: []const u8,
@ -644,21 +656,256 @@ fn openFile(
return .{ .handle = fs_file.handle }; return .{ .handle = fs_file.handle };
} }
fn closeFile(userdata: ?*anyopaque, file: Io.File) void { fn fileClose(userdata: ?*anyopaque, file: Io.File) void {
const pool: *Pool = @ptrCast(@alignCast(userdata)); const pool: *Pool = @ptrCast(@alignCast(userdata));
_ = pool; _ = pool;
const fs_file: std.fs.File = .{ .handle = file.handle }; const fs_file: std.fs.File = .{ .handle = file.handle };
return fs_file.close(); return fs_file.close();
} }
fn pread(userdata: ?*anyopaque, file: Io.File, buffer: []u8, offset: posix.off_t) Io.File.PReadError!usize { fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File.ReadStreamingError!usize {
const pool: *Pool = @ptrCast(@alignCast(userdata));
if (is_windows) {
const DWORD = windows.DWORD;
var index: usize = 0;
var truncate: usize = 0;
var total: usize = 0;
while (index < data.len) {
try pool.checkCancel();
{
const untruncated = data[index];
data[index] = untruncated[truncate..];
defer data[index] = untruncated;
const buffer = data[index..];
const want_read_count: DWORD = @min(std.math.maxInt(DWORD), buffer.len);
var n: DWORD = undefined;
if (windows.kernel32.ReadFile(file.handle, buffer.ptr, want_read_count, &n, null) == 0) {
switch (windows.GetLastError()) {
.IO_PENDING => unreachable,
.OPERATION_ABORTED => continue,
.BROKEN_PIPE => return 0,
.HANDLE_EOF => return 0,
.NETNAME_DELETED => return error.ConnectionResetByPeer,
.LOCK_VIOLATION => return error.LockViolation,
.ACCESS_DENIED => return error.AccessDenied,
.INVALID_HANDLE => return error.NotOpenForReading,
else => |err| return windows.unexpectedError(err),
}
}
total += n;
truncate += n;
}
while (index < data.len and truncate >= data[index].len) {
truncate -= data[index].len;
index += 1;
}
}
return total;
}
var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
var i: usize = 0;
for (data) |buf| {
if (iovecs_buffer.len - i == 0) break;
if (buf.len != 0) {
iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len };
i += 1;
}
}
const dest = iovecs_buffer[0..i];
assert(dest[0].len > 0);
if (native_os == .wasi and !builtin.link_libc) {
try pool.checkCancel();
var nread: usize = undefined;
switch (std.os.wasi.fd_read(file.handle, dest.ptr, dest.len, &nread)) {
.SUCCESS => return nread,
.INTR => unreachable,
.INVAL => unreachable,
.FAULT => unreachable,
.AGAIN => unreachable, // currently not support in WASI
.BADF => return error.NotOpenForReading, // can be a race condition
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
.NOTCAPABLE => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
}
while (true) {
try pool.checkCancel();
const rc = posix.system.readv(file.handle, dest.ptr, dest.len);
switch (posix.errno(rc)) {
.SUCCESS => return @intCast(rc),
.INTR => continue,
.INVAL => unreachable,
.FAULT => unreachable,
.SRCH => return error.ProcessNotFound,
.AGAIN => return error.WouldBlock,
.BADF => return error.NotOpenForReading, // can be a race condition
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset: u64) Io.File.ReadPositionalError!usize {
const pool: *Pool = @ptrCast(@alignCast(userdata));
const have_pread_but_not_preadv = switch (native_os) {
.windows, .macos, .ios, .watchos, .tvos, .visionos, .haiku, .serenity => true,
else => false,
};
if (have_pread_but_not_preadv) {
@compileError("TODO");
}
if (is_windows) {
const DWORD = windows.DWORD;
const OVERLAPPED = windows.OVERLAPPED;
var index: usize = 0;
var truncate: usize = 0;
var total: usize = 0;
while (true) {
try pool.checkCancel();
{
const untruncated = data[index];
data[index] = untruncated[truncate..];
defer data[index] = untruncated;
const buffer = data[index..];
const want_read_count: DWORD = @min(std.math.maxInt(DWORD), buffer.len);
var n: DWORD = undefined;
var overlapped_data: OVERLAPPED = undefined;
const overlapped: ?*OVERLAPPED = if (offset) |off| blk: {
overlapped_data = .{
.Internal = 0,
.InternalHigh = 0,
.DUMMYUNIONNAME = .{
.DUMMYSTRUCTNAME = .{
.Offset = @as(u32, @truncate(off)),
.OffsetHigh = @as(u32, @truncate(off >> 32)),
},
},
.hEvent = null,
};
break :blk &overlapped_data;
} else null;
if (windows.kernel32.ReadFile(file.handle, buffer.ptr, want_read_count, &n, overlapped) == 0) {
switch (windows.GetLastError()) {
.IO_PENDING => unreachable,
.OPERATION_ABORTED => continue,
.BROKEN_PIPE => return 0,
.HANDLE_EOF => return 0,
.NETNAME_DELETED => return error.ConnectionResetByPeer,
.LOCK_VIOLATION => return error.LockViolation,
.ACCESS_DENIED => return error.AccessDenied,
.INVALID_HANDLE => return error.NotOpenForReading,
else => |err| return windows.unexpectedError(err),
}
}
total += n;
truncate += n;
}
while (index < data.len and truncate >= data[index].len) {
truncate -= data[index].len;
index += 1;
}
}
return total;
}
var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
var i: usize = 0;
for (data) |buf| {
if (iovecs_buffer.len - i == 0) break;
if (buf.len != 0) {
iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len };
i += 1;
}
}
const dest = iovecs_buffer[0..i];
assert(dest[0].len > 0);
if (native_os == .wasi and !builtin.link_libc) {
try pool.checkCancel();
var nread: usize = undefined;
switch (std.os.wasi.fd_pread(file.handle, dest.ptr, dest.len, offset, &nread)) {
.SUCCESS => return nread,
.INTR => unreachable,
.INVAL => unreachable,
.FAULT => unreachable,
.AGAIN => unreachable,
.BADF => return error.NotOpenForReading, // can be a race condition
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
.NXIO => return error.Unseekable,
.SPIPE => return error.Unseekable,
.OVERFLOW => return error.Unseekable,
.NOTCAPABLE => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
}
const preadv_sym = if (posix.lfs64_abi) posix.system.preadv64 else posix.system.preadv;
while (true) {
try pool.checkCancel();
const rc = preadv_sym(file.handle, dest.ptr, dest.len, @bitCast(offset));
switch (posix.errno(rc)) {
.SUCCESS => return @bitCast(rc),
.INTR => continue,
.INVAL => unreachable,
.FAULT => unreachable,
.SRCH => return error.ProcessNotFound,
.AGAIN => return error.WouldBlock,
.BADF => return error.NotOpenForReading, // can be a race condition
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
.NXIO => return error.Unseekable,
.SPIPE => return error.Unseekable,
.OVERFLOW => return error.Unseekable,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn fileSeekBy(userdata: ?*anyopaque, file: Io.File, offset: i64) Io.File.SeekError!void {
const pool: *Pool = @ptrCast(@alignCast(userdata)); const pool: *Pool = @ptrCast(@alignCast(userdata));
try pool.checkCancel(); try pool.checkCancel();
const fs_file: std.fs.File = .{ .handle = file.handle };
return switch (offset) { _ = file;
-1 => fs_file.read(buffer), _ = offset;
else => fs_file.pread(buffer, @bitCast(offset)), @panic("TODO");
}; }
fn fileSeekTo(userdata: ?*anyopaque, file: Io.File, offset: u64) Io.File.SeekError!void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
try pool.checkCancel();
_ = file;
_ = offset;
@panic("TODO");
} }
fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: posix.off_t) Io.File.PWriteError!usize { fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: posix.off_t) Io.File.PWriteError!usize {

View file

@ -5,7 +5,7 @@ const Writer = @This();
const std = @import("../std.zig"); const std = @import("../std.zig");
const assert = std.debug.assert; const assert = std.debug.assert;
const Limit = std.Io.Limit; const Limit = std.Io.Limit;
const File = std.fs.File; const File = std.Io.File;
const testing = std.testing; const testing = std.testing;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const ArrayList = std.ArrayList; const ArrayList = std.ArrayList;

View file

@ -17,9 +17,12 @@ pub const ListenOptions = struct {
force_nonblocking: bool = false, force_nonblocking: bool = false,
}; };
/// An already-validated host name. /// An already-validated host name. A valid host name:
/// * Has length less than or equal to `max_len`.
/// * Is valid UTF-8.
/// * Lacks ASCII characters other than alphanumeric, '-', and '.'.
pub const HostName = struct { pub const HostName = struct {
/// Externally managed memory. Already checked to be within `max_len`. /// Externally managed memory. Already checked to be valid.
bytes: []const u8, bytes: []const u8,
pub const max_len = 255; pub const max_len = 255;
@ -55,13 +58,14 @@ pub const HostName = struct {
family: ?IpAddress.Tag = null, family: ?IpAddress.Tag = null,
}; };
pub const LookupError = Io.Cancelable || error{}; pub const LookupError = Io.Cancelable || Io.File.OpenError || Io.File.Reader.Error || error{
UnknownHostName,
};
pub const LookupResult = struct { pub const LookupResult = struct {
/// How many `LookupOptions.addresses_buffer` elements are populated. /// How many `LookupOptions.addresses_buffer` elements are populated.
addresses_len: usize, addresses_len: usize = 0,
/// Length zero means no canonical name returned. canonical_name: ?HostName = null,
canonical_name_len: usize,
}; };
pub fn lookup(host_name: HostName, io: Io, options: LookupOptions) LookupError!LookupResult { pub fn lookup(host_name: HostName, io: Io, options: LookupOptions) LookupError!LookupResult {
@ -75,17 +79,17 @@ pub const HostName = struct {
if (options.family != .ip6) { if (options.family != .ip6) {
if (IpAddress.parseIp4(name, options.port)) |addr| { if (IpAddress.parseIp4(name, options.port)) |addr| {
options.addresses_buffer[0] = addr; options.addresses_buffer[0] = addr;
return .{ .addresses_len = 1, .canonical_name_len = 0 }; return .{ .addresses_len = 1 };
} else |_| {} } else |_| {}
} }
if (options.family != .ip4) { if (options.family != .ip4) {
if (IpAddress.parseIp6(name, options.port)) |addr| { if (IpAddress.parseIp6(name, options.port)) |addr| {
options.addresses_buffer[0] = addr; options.addresses_buffer[0] = addr;
return .{ .addresses_len = 1, .canonical_name_len = 0 }; return .{ .addresses_len = 1 };
} else |_| {} } else |_| {}
} }
{ {
const result = try lookupHosts(io, options); const result = try lookupHosts(host_name, io, options);
if (result.addresses_len > 0) return sortLookupResults(options, result); if (result.addresses_len > 0) return sortLookupResults(options, result);
} }
{ {
@ -110,8 +114,12 @@ pub const HostName = struct {
i += 1; i += 1;
} }
const canon_name = "localhost"; const canon_name = "localhost";
options.canonical_name_buffer[0..canon_name.len].* = canon_name.*; const canon_name_dest = options.canonical_name_buffer[0..canon_name.len];
return sortLookupResults(options, .{ .addresses_len = i, .canonical_name_len = canon_name.len }); canon_name_dest.* = canon_name.*;
return sortLookupResults(options, .{
.addresses_len = i,
.canonical_name = .{ .bytes = canon_name_dest },
});
} }
} }
{ {
@ -135,27 +143,27 @@ pub const HostName = struct {
@panic("TODO"); @panic("TODO");
} }
fn lookupHosts(io: Io, options: LookupOptions) !LookupResult { fn lookupHosts(host_name: HostName, io: Io, options: LookupOptions) !LookupResult {
const file = Io.File.openFileAbsoluteZ(io, "/etc/hosts", .{}) catch |err| switch (err) { const file = Io.File.openAbsolute(io, "/etc/hosts", .{}) catch |err| switch (err) {
error.FileNotFound, error.FileNotFound,
error.NotDir, error.NotDir,
error.AccessDenied, error.AccessDenied,
=> return, => return .{},
else => |e| return e, else => |e| return e,
}; };
defer file.close(); defer file.close(io);
var line_buf: [512]u8 = undefined; var line_buf: [512]u8 = undefined;
var file_reader = file.reader(io, &line_buf); var file_reader = file.reader(io, &line_buf);
return lookupHostsReader(options, &file_reader.interface) catch |err| switch (err) { return lookupHostsReader(host_name, options, &file_reader.interface) catch |err| switch (err) {
error.OutOfMemory => return error.OutOfMemory,
error.ReadFailed => return file_reader.err.?, error.ReadFailed => return file_reader.err.?,
}; };
} }
fn lookupHostsReader(options: LookupOptions, reader: *Io.Reader) error{ReadFailed}!LookupResult { fn lookupHostsReader(host_name: HostName, options: LookupOptions, reader: *Io.Reader) error{ReadFailed}!LookupResult {
var addresses_len: usize = 0; var addresses_len: usize = 0;
var canonical_name_len: usize = 0; var canonical_name: ?HostName = null;
while (true) { while (true) {
const line = reader.takeDelimiterExclusive('\n') catch |err| switch (err) { const line = reader.takeDelimiterExclusive('\n') catch |err| switch (err) {
error.StreamTooLong => { error.StreamTooLong => {
@ -176,19 +184,20 @@ pub const HostName = struct {
const ip_text = line_it.next() orelse continue; const ip_text = line_it.next() orelse continue;
var first_name_text: ?[]const u8 = null; var first_name_text: ?[]const u8 = null;
while (line_it.next()) |name_text| { while (line_it.next()) |name_text| {
if (std.mem.eql(u8, name_text, options.name)) { if (std.mem.eql(u8, name_text, host_name.bytes)) {
if (first_name_text == null) first_name_text = name_text; if (first_name_text == null) first_name_text = name_text;
break; break;
} }
} else continue; } else continue;
if (canonical_name_len == 0) { if (canonical_name == null) {
if (HostName.init(first_name_text)) |name_text| { if (HostName.init(first_name_text.?)) |name_text| {
if (name_text.len <= options.canonical_name_buffer.len) { if (name_text.bytes.len <= options.canonical_name_buffer.len) {
@memcpy(options.canonical_name_buffer[0..name_text.len], name_text); const canonical_name_dest = options.canonical_name_buffer[0..name_text.bytes.len];
canonical_name_len = name_text.len; @memcpy(canonical_name_dest, name_text.bytes);
canonical_name = .{ .bytes = canonical_name_dest };
} }
} } else |_| {}
} }
if (options.family != .ip6) { if (options.family != .ip6) {
@ -197,7 +206,7 @@ pub const HostName = struct {
addresses_len += 1; addresses_len += 1;
if (options.addresses_buffer.len - addresses_len == 0) return .{ if (options.addresses_buffer.len - addresses_len == 0) return .{
.addresses_len = addresses_len, .addresses_len = addresses_len,
.canonical_name_len = canonical_name_len, .canonical_name = canonical_name,
}; };
} else |_| {} } else |_| {}
} }
@ -207,11 +216,15 @@ pub const HostName = struct {
addresses_len += 1; addresses_len += 1;
if (options.addresses_buffer.len - addresses_len == 0) return .{ if (options.addresses_buffer.len - addresses_len == 0) return .{
.addresses_len = addresses_len, .addresses_len = addresses_len,
.canonical_name_len = canonical_name_len, .canonical_name = canonical_name,
}; };
} else |_| {} } else |_| {}
} }
} }
return .{
.addresses_len = addresses_len,
.canonical_name = canonical_name,
};
} }
pub const ConnectTcpError = LookupError || IpAddress.ConnectTcpError; pub const ConnectTcpError = LookupError || IpAddress.ConnectTcpError;
@ -289,9 +302,9 @@ pub const IpAddress = union(enum) {
} }
} }
pub fn format(a: IpAddress, w: *std.io.Writer) std.io.Writer.Error!void { pub fn format(a: IpAddress, w: *Io.Writer) Io.Writer.Error!void {
switch (a) { switch (a) {
.ip4, .ip6 => |x| return x.format(w), inline .ip4, .ip6 => |x| return x.format(w),
} }
} }
@ -365,7 +378,7 @@ pub const Ip4Address = struct {
return error.Incomplete; return error.Incomplete;
} }
pub fn format(a: Ip4Address, w: *std.io.Writer) std.io.Writer.Error!void { pub fn format(a: Ip4Address, w: *Io.Writer) Io.Writer.Error!void {
const bytes = &a.bytes; const bytes = &a.bytes;
try w.print("{d}.{d}.{d}.{d}:{d}", .{ bytes[0], bytes[1], bytes[2], bytes[3], a.port }); try w.print("{d}.{d}.{d}.{d}:{d}", .{ bytes[0], bytes[1], bytes[2], bytes[3], a.port });
} }
@ -393,6 +406,13 @@ pub const Ip6Address = struct {
Incomplete, Incomplete,
}; };
pub fn localhost(port: u16) Ip6Address {
return .{
.bytes = .{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1 },
.port = port,
};
}
pub fn parse(buffer: []const u8, port: u16) ParseError!Ip6Address { pub fn parse(buffer: []const u8, port: u16) ParseError!Ip6Address {
var result: Ip6Address = .{ var result: Ip6Address = .{
.port = port, .port = port,
@ -504,7 +524,7 @@ pub const Ip6Address = struct {
} }
} }
pub fn format(a: Ip6Address, w: *std.io.Writer) std.io.Writer.Error!void { pub fn format(a: Ip6Address, w: *Io.Writer) Io.Writer.Error!void {
const bytes = &a.bytes; const bytes = &a.bytes;
if (std.mem.eql(u8, bytes[0..12], &[_]u8{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff })) { if (std.mem.eql(u8, bytes[0..12], &[_]u8{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff })) {
try w.print("[::ffff:{d}.{d}.{d}.{d}]:{d}", .{ try w.print("[::ffff:{d}.{d}.{d}.{d}]:{d}", .{

View file

@ -17,25 +17,12 @@ const Alignment = std.mem.Alignment;
/// The OS-specific file descriptor or file handle. /// The OS-specific file descriptor or file handle.
handle: Handle, handle: Handle,
pub const Handle = posix.fd_t; pub const Handle = std.Io.File.Handle;
pub const Mode = posix.mode_t; pub const Mode = std.Io.File.Mode;
pub const INode = posix.ino_t; pub const INode = std.Io.File.INode;
pub const Uid = posix.uid_t; pub const Uid = posix.uid_t;
pub const Gid = posix.gid_t; pub const Gid = posix.gid_t;
pub const Kind = std.Io.File.Kind;
pub const Kind = enum {
block_device,
character_device,
directory,
named_pipe,
sym_link,
file,
unix_domain_socket,
whiteout,
door,
event_port,
unknown,
};
/// This is the default mode given to POSIX operating systems for creating /// This is the default mode given to POSIX operating systems for creating
/// files. `0o666` is "-rw-rw-rw-" which is counter-intuitive at first, /// files. `0o666` is "-rw-rw-rw-" which is counter-intuitive at first,
@ -399,115 +386,11 @@ pub fn mode(self: File) ModeError!Mode {
return (try self.stat()).mode; return (try self.stat()).mode;
} }
pub const Stat = struct { pub const Stat = std.Io.File.Stat;
/// A number that the system uses to point to the file metadata. This
/// number is not guaranteed to be unique across time, as some file
/// systems may reuse an inode after its file has been deleted. Some
/// systems may change the inode of a file over time.
///
/// On Linux, the inode is a structure that stores the metadata, and
/// the inode _number_ is what you see here: the index number of the
/// inode.
///
/// The FileIndex on Windows is similar. It is a number for a file that
/// is unique to each filesystem.
inode: INode,
size: u64,
/// This is available on POSIX systems and is always 0 otherwise.
mode: Mode,
kind: Kind,
/// Last access time in nanoseconds, relative to UTC 1970-01-01.
atime: i128,
/// Last modification time in nanoseconds, relative to UTC 1970-01-01.
mtime: i128,
/// Last status/metadata change time in nanoseconds, relative to UTC 1970-01-01.
ctime: i128,
pub fn fromPosix(st: posix.Stat) Stat {
const atime = st.atime();
const mtime = st.mtime();
const ctime = st.ctime();
return .{
.inode = st.ino,
.size = @bitCast(st.size),
.mode = st.mode,
.kind = k: {
const m = st.mode & posix.S.IFMT;
switch (m) {
posix.S.IFBLK => break :k .block_device,
posix.S.IFCHR => break :k .character_device,
posix.S.IFDIR => break :k .directory,
posix.S.IFIFO => break :k .named_pipe,
posix.S.IFLNK => break :k .sym_link,
posix.S.IFREG => break :k .file,
posix.S.IFSOCK => break :k .unix_domain_socket,
else => {},
}
if (builtin.os.tag == .illumos) switch (m) {
posix.S.IFDOOR => break :k .door,
posix.S.IFPORT => break :k .event_port,
else => {},
};
break :k .unknown;
},
.atime = @as(i128, atime.sec) * std.time.ns_per_s + atime.nsec,
.mtime = @as(i128, mtime.sec) * std.time.ns_per_s + mtime.nsec,
.ctime = @as(i128, ctime.sec) * std.time.ns_per_s + ctime.nsec,
};
}
pub fn fromLinux(stx: linux.Statx) Stat {
const atime = stx.atime;
const mtime = stx.mtime;
const ctime = stx.ctime;
return .{
.inode = stx.ino,
.size = stx.size,
.mode = stx.mode,
.kind = switch (stx.mode & linux.S.IFMT) {
linux.S.IFDIR => .directory,
linux.S.IFCHR => .character_device,
linux.S.IFBLK => .block_device,
linux.S.IFREG => .file,
linux.S.IFIFO => .named_pipe,
linux.S.IFLNK => .sym_link,
linux.S.IFSOCK => .unix_domain_socket,
else => .unknown,
},
.atime = @as(i128, atime.sec) * std.time.ns_per_s + atime.nsec,
.mtime = @as(i128, mtime.sec) * std.time.ns_per_s + mtime.nsec,
.ctime = @as(i128, ctime.sec) * std.time.ns_per_s + ctime.nsec,
};
}
pub fn fromWasi(st: std.os.wasi.filestat_t) Stat {
return .{
.inode = st.ino,
.size = @bitCast(st.size),
.mode = 0,
.kind = switch (st.filetype) {
.BLOCK_DEVICE => .block_device,
.CHARACTER_DEVICE => .character_device,
.DIRECTORY => .directory,
.SYMBOLIC_LINK => .sym_link,
.REGULAR_FILE => .file,
.SOCKET_STREAM, .SOCKET_DGRAM => .unix_domain_socket,
else => .unknown,
},
.atime = st.atim,
.mtime = st.mtim,
.ctime = st.ctim,
};
}
};
pub const StatError = posix.FStatError; pub const StatError = posix.FStatError;
/// Returns `Stat` containing basic information about the `File`. /// Returns `Stat` containing basic information about the `File`.
/// TODO: integrate with async I/O
pub fn stat(self: File) StatError!Stat { pub fn stat(self: File) StatError!Stat {
if (builtin.os.tag == .windows) { if (builtin.os.tag == .windows) {
var io_status_block: windows.IO_STATUS_BLOCK = undefined; var io_status_block: windows.IO_STATUS_BLOCK = undefined;
@ -1728,7 +1611,7 @@ pub const Writer = struct {
pub fn sendFile( pub fn sendFile(
io_w: *std.Io.Writer, io_w: *std.Io.Writer,
file_reader: *Reader, file_reader: *std.Io.File.Reader,
limit: std.Io.Limit, limit: std.Io.Limit,
) std.Io.Writer.FileError!usize { ) std.Io.Writer.FileError!usize {
const reader_buffered = file_reader.interface.buffered(); const reader_buffered = file_reader.interface.buffered();
@ -1995,7 +1878,7 @@ pub const Writer = struct {
fn sendFileBuffered( fn sendFileBuffered(
io_w: *std.Io.Writer, io_w: *std.Io.Writer,
file_reader: *Reader, file_reader: *std.Io.File.Reader,
reader_buffered: []const u8, reader_buffered: []const u8,
) std.Io.Writer.FileError!usize { ) std.Io.Writer.FileError!usize {
const n = try drain(io_w, &.{reader_buffered}, 1); const n = try drain(io_w, &.{reader_buffered}, 1);

View file

@ -805,36 +805,7 @@ pub fn exit(status: u8) noreturn {
system.exit(status); system.exit(status);
} }
pub const ReadError = error{ pub const ReadError = std.Io.File.ReadStreamingError;
InputOutput,
SystemResources,
IsDir,
OperationAborted,
BrokenPipe,
ConnectionResetByPeer,
ConnectionTimedOut,
NotOpenForReading,
SocketNotConnected,
/// This error occurs when no global event loop is configured,
/// and reading from the file descriptor would block.
WouldBlock,
/// reading a timerfd with CANCEL_ON_SET will lead to this error
/// when the clock goes through a discontinuous change
Canceled,
/// In WASI, this error occurs when the file descriptor does
/// not hold the required rights to read from it.
AccessDenied,
/// This error occurs in Linux if the process to be read from
/// no longer exists.
ProcessNotFound,
/// Unable to read file due to lock.
LockViolation,
} || UnexpectedError;
/// Returns the number of bytes that were read, which can be less than /// Returns the number of bytes that were read, which can be less than
/// buf.len. If 0 bytes were read, that means EOF. /// buf.len. If 0 bytes were read, that means EOF.
@ -921,7 +892,6 @@ pub fn read(fd: fd_t, buf: []u8) ReadError!usize {
/// a pointer within the address space of the application. /// a pointer within the address space of the application.
pub fn readv(fd: fd_t, iov: []const iovec) ReadError!usize { pub fn readv(fd: fd_t, iov: []const iovec) ReadError!usize {
if (native_os == .windows) { if (native_os == .windows) {
// TODO improve this to use ReadFileScatter
if (iov.len == 0) return 0; if (iov.len == 0) return 0;
const first = iov[0]; const first = iov[0];
return read(fd, first.base[0..first.len]); return read(fd, first.base[0..first.len]);
@ -969,7 +939,7 @@ pub fn readv(fd: fd_t, iov: []const iovec) ReadError!usize {
} }
} }
pub const PReadError = ReadError || error{Unseekable}; pub const PReadError = std.Io.ReadPositionalError;
/// Number of bytes read is returned. Upon reading end-of-file, zero is returned. /// Number of bytes read is returned. Upon reading end-of-file, zero is returned.
/// ///
@ -5393,13 +5363,7 @@ pub fn gettimeofday(tv: ?*timeval, tz: ?*timezone) void {
} }
} }
pub const SeekError = error{ pub const SeekError = std.Io.File.SeekError;
Unseekable,
/// In WASI, this error may occur when the file descriptor does
/// not hold the required rights to seek on it.
AccessDenied,
} || UnexpectedError;
/// Repositions read/write file offset relative to the beginning. /// Repositions read/write file offset relative to the beginning.
pub fn lseek_SET(fd: fd_t, offset: u64) SeekError!void { pub fn lseek_SET(fd: fd_t, offset: u64) SeekError!void {
@ -7572,7 +7536,7 @@ pub fn ioctl_SIOCGIFINDEX(fd: fd_t, ifr: *ifreq) IoCtl_SIOCGIFINDEX_Error!void {
} }
} }
const lfs64_abi = native_os == .linux and builtin.link_libc and (builtin.abi.isGnu() or builtin.abi.isAndroid()); pub const lfs64_abi = native_os == .linux and builtin.link_libc and (builtin.abi.isGnu() or builtin.abi.isAndroid());
/// Whether or not `error.Unexpected` will print its value and a stack trace. /// Whether or not `error.Unexpected` will print its value and a stack trace.
/// ///
@ -7584,17 +7548,7 @@ pub const unexpected_error_tracing = builtin.mode == .Debug and switch (builtin.
else => false, else => false,
}; };
pub const UnexpectedError = error{ pub const UnexpectedError = std.Io.UnexpectedError;
/// The Operating System returned an undocumented error code.
///
/// This error is in theory not possible, but it would be better
/// to handle this error than to invoke undefined behavior.
///
/// When this error code is observed, it usually means the Zig Standard
/// Library needs a small patch to add the error code to the error set for
/// the respective function.
Unexpected,
};
/// Call this when you made a syscall or something that sets errno /// Call this when you made a syscall or something that sets errno
/// and you get an unexpected error. /// and you get an unexpected error.