diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index ca42627005..a02e1f3b40 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -9,26 +9,25 @@ const IoUring = std.os.linux.IoUring; gpa: Allocator, mutex: std.Thread.Mutex, -cond: std.Thread.Condition, queue: std.DoublyLinkedList(void), +/// Atomic copy of queue.len +queue_len: usize, free: std.DoublyLinkedList(void), -main_context: Context, -exit_awaiter: ?*Fiber, +main_fiber: Fiber, +idle_count: usize, threads: std.ArrayListUnmanaged(Thread), -/// 1 bit per thread, same order as `thread_index`. -idle_iourings: []usize, +exiting: bool, threadlocal var thread_index: u32 = undefined; /// Empirically saw 10KB being used by the self-hosted backend for logging. -const idle_stack_size = 32 * 1024; +const idle_stack_size = 64 * 1024; const io_uring_entries = 64; const Thread = struct { thread: std.Thread, idle_context: Context, - current_idle_context: *Context, current_context: *Context, io_uring: IoUring, @@ -103,98 +102,92 @@ pub fn io(el: *EventLoop) Io { } pub fn init(el: *EventLoop, gpa: Allocator) !void { - const n_threads: usize = @max((std.Thread.getCpuCount() catch 1), 1); - const threads_bytes = n_threads * @sizeOf(Thread); - const idle_context_offset = std.mem.alignForward(usize, threads_bytes, @alignOf(Context)); - const idle_stack_end_offset = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max); - const allocated_slice = try gpa.alignedAlloc(u8, @max(@alignOf(Thread), @alignOf(Context)), idle_stack_end_offset); + const threads_size = @max(std.Thread.getCpuCount() catch 1, 1) * @sizeOf(Thread); + const idle_stack_end_offset = std.mem.alignForward(usize, threads_size + idle_stack_size, std.heap.page_size_max); + const allocated_slice = try gpa.alignedAlloc(u8, @alignOf(Thread), idle_stack_end_offset); errdefer gpa.free(allocated_slice); - const idle_iourings = try gpa.alloc(usize, (n_threads + @bitSizeOf(usize) - 1) / @bitSizeOf(usize)); - errdefer gpa.free(idle_iourings); - @memset(idle_iourings, 0); el.* = .{ .gpa = gpa, .mutex = .{}, - .cond = .{}, .queue = .{}, + .queue_len = 0, .free = .{}, - .main_context = undefined, - .exit_awaiter = null, - .threads = .initBuffer(@ptrCast(allocated_slice[0..threads_bytes])), - .idle_iourings = idle_iourings, + .main_fiber = undefined, + .idle_count = 0, + .threads = .initBuffer(@ptrCast(allocated_slice[0..threads_size])), + .exiting = false, }; + thread_index = 0; const main_thread = el.threads.addOneAssumeCapacity(); main_thread.io_uring = try IoUring.init(io_uring_entries, 0); - const main_idle_context: *Context = @alignCast(std.mem.bytesAsValue(Context, allocated_slice[idle_context_offset..][0..@sizeOf(Context)])); - const idle_stack_end: [*]align(@max(@alignOf(Thread), @alignOf(Context))) usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr)); + const idle_stack_end: [*]usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr)); (idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)}; - main_idle_context.* = .{ + main_thread.idle_context = .{ .rsp = @intFromPtr(idle_stack_end - 1), .rbp = 0, .rip = @intFromPtr(&mainIdleEntry), }; - std.log.debug("created main idle {*}", .{main_idle_context}); - main_thread.current_idle_context = main_idle_context; - std.log.debug("created main {*}", .{&el.main_context}); - main_thread.current_context = &el.main_context; + std.log.debug("created main idle {*}", .{&main_thread.idle_context}); + std.log.debug("created main {*}", .{&el.main_fiber}); + main_thread.current_context = &el.main_fiber.context; } pub fn deinit(el: *EventLoop) void { assert(el.queue.len == 0); // pending async - el.yield(null, &el.exit_awaiter); + el.yield(null, .exit); while (el.free.pop()) |free_node| { const free_fiber: *Fiber = @alignCast(@fieldParentPtr("queue_node", free_node)); el.gpa.free(free_fiber.allocatedSlice()); } - const idle_context_offset = std.mem.alignForward(usize, el.threads.capacity * @sizeOf(Thread), @alignOf(Context)); - const idle_stack_end = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max); - const allocated_ptr: [*]align(@max(@alignOf(Thread), @alignOf(Context))) u8 = @alignCast(@ptrCast(el.threads.items.ptr)); + const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.capacity * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max); + const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @alignCast(@ptrCast(el.threads.items.ptr)); for (el.threads.items[1..]) |*thread| thread.thread.join(); - el.gpa.free(allocated_ptr[0..idle_stack_end]); + el.gpa.free(allocated_ptr[0..idle_stack_end_offset]); } -const PendingTask = union(enum) { - none, - register_awaiter: *?*Fiber, - io_uring_submit: *IoUring, -}; - -fn yield(el: *EventLoop, optional_fiber: ?*Fiber, pending_task: PendingTask) void { +fn yield(el: *EventLoop, optional_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void { const thread: *Thread = &el.threads.items[thread_index]; const ready_context: *Context = ready_context: { const ready_fiber: *Fiber = optional_fiber orelse if (ready_node: { el.mutex.lock(); defer el.mutex.unlock(); - break :ready_node el.queue.pop(); + const ready_node = el.queue.pop(); + @atomicStore(usize, &el.queue_len, el.queue.len, .unordered); + break :ready_node ready_node; }) |ready_node| @alignCast(@fieldParentPtr("queue_node", ready_node)) else - break :ready_context thread.current_idle_context; + break :ready_context &thread.idle_context; break :ready_context &ready_fiber.context; }; const message: SwitchMessage = .{ - .prev_context = thread.current_context, - .ready_context = ready_context, + .contexts = .{ + .prev = thread.current_context, + .ready = ready_context, + }, .pending_task = pending_task, }; - std.log.debug("switching from {*} to {*}", .{ message.prev_context, message.ready_context }); + std.log.debug("switching from {*} to {*}", .{ message.contexts.prev, message.contexts.ready }); contextSwitch(&message).handle(el); } fn schedule(el: *EventLoop, fiber: *Fiber) void { - el.mutex.lock(); - el.queue.append(&fiber.queue_node); - //for (el.idle_iourings) |*int| { - // const idler_subset = @atomicLoad(usize, int, .unordered); - // if (idler_subset == 0) continue; - // - //} - if (el.idle_count > 0) { - el.mutex.unlock(); - el.cond.signal(); + if (idle_count: { + el.mutex.lock(); + defer el.mutex.unlock(); + el.queue.append(&fiber.queue_node); + @atomicStore(usize, &el.queue_len, el.queue.len, .unordered); + break :idle_count el.idle_count; + } > 0) { + _ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(usize), 1, switch (@bitSizeOf(usize)) { + 8 => std.os.linux.FUTEX2.SIZE_U8, + 16 => std.os.linux.FUTEX2.SIZE_U16, + 32 => std.os.linux.FUTEX2.SIZE_U32, + 64 => std.os.linux.FUTEX2.SIZE_U64, + else => @compileError("unsupported @sizeOf(usize)"), + } | std.os.linux.FUTEX2.PRIVATE); // TODO: io_uring return; } - defer el.mutex.unlock(); if (el.threads.items.len == el.threads.capacity) return; const thread = el.threads.addOneAssumeCapacity(); thread.thread = std.Thread.spawn(.{ @@ -216,64 +209,101 @@ fn recycle(el: *EventLoop, fiber: *Fiber) void { fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn { message.handle(el); - el.yield(el.idle(), null); + el.idle(); + el.yield(&el.main_fiber, .nothing); unreachable; // switched to dead fiber } fn threadEntry(el: *EventLoop, index: usize) void { - thread_index = index; + thread_index = @intCast(index); const thread: *Thread = &el.threads.items[index]; std.log.debug("created thread idle {*}", .{&thread.idle_context}); thread.io_uring = IoUring.init(io_uring_entries, 0) catch |err| { std.log.warn("exiting worker thread during init due to io_uring init failure: {s}", .{@errorName(err)}); return; }; - thread.current_idle_context = &thread.idle_context; thread.current_context = &thread.idle_context; - _ = el.idle(); + el.idle(); } -fn idle(el: *EventLoop) *Fiber { +const UserData = enum(u64) { + queue_len_futex_wait, + _, +}; + +fn idle(el: *EventLoop) void { const thread: *Thread = &el.threads.items[thread_index]; - // The idle fiber only runs on one thread. const iou = &thread.io_uring; var cqes_buffer: [io_uring_entries]std.os.linux.io_uring_cqe = undefined; + var futex_is_scheduled: bool = false; while (true) { - el.yield(null, null); - if (@atomicLoad(?*Fiber, &el.exit_awaiter, .acquire)) |exit_awaiter| { - el.cond.broadcast(); - return exit_awaiter; - } - // TODO add uring to bit set - const n = iou.copy_cqes(&cqes_buffer, 1) catch @panic("TODO handle copy_cqes error"); - const cqes = cqes_buffer[0..n]; - for (cqes) |cqe| { - const fiber: *Fiber = @ptrFromInt(cqe.user_data); - const res: *i32 = @ptrCast(@alignCast(fiber.resultPointer())); - res.* = cqe.res; - el.schedule(fiber); + el.yield(null, .nothing); + if (@atomicLoad(bool, &el.exiting, .acquire)) return; + if (!futex_is_scheduled) { + const sqe = getSqe(&thread.io_uring); + sqe.prep_rw(.FUTEX_WAIT, switch (@bitSizeOf(usize)) { + 8 => std.os.linux.FUTEX2.SIZE_U8, + 16 => std.os.linux.FUTEX2.SIZE_U16, + 32 => std.os.linux.FUTEX2.SIZE_U32, + 64 => std.os.linux.FUTEX2.SIZE_U64, + else => @compileError("unsupported @sizeOf(usize)"), + } | std.os.linux.FUTEX2.PRIVATE, @intFromPtr(&el.queue_len), 0, 0); + sqe.addr3 = std.math.maxInt(u64); + sqe.user_data = @intFromEnum(UserData.queue_len_futex_wait); + futex_is_scheduled = true; } + _ = iou.submit_and_wait(1) catch |err| switch (err) { + error.SignalInterrupt => 0, + else => @panic(@errorName(err)), + }; + for (cqes_buffer[0 .. iou.copy_cqes(&cqes_buffer, 1) catch |err| switch (err) { + error.SignalInterrupt => 0, + else => @panic(@errorName(err)), + }]) |cqe| switch (@as(UserData, @enumFromInt(cqe.user_data))) { + .queue_len_futex_wait => futex_is_scheduled = false, + _ => { + const fiber: *Fiber = @ptrFromInt(cqe.user_data); + const res: *i32 = @ptrCast(@alignCast(fiber.resultPointer())); + res.* = cqe.res; + el.schedule(fiber); + }, + }; } } -const SwitchMessage = extern struct { - prev_context: *Context, - ready_context: *Context, +const SwitchMessage = struct { + contexts: extern struct { + prev: *Context, + ready: *Context, + }, pending_task: PendingTask, + const PendingTask = union(enum) { + nothing, + register_awaiter: *?*Fiber, + exit, + }; + fn handle(message: *const SwitchMessage, el: *EventLoop) void { const thread: *Thread = &el.threads.items[thread_index]; - thread.current_context = message.ready_context; + thread.current_context = message.contexts.ready; switch (message.pending_task) { - .none => {}, + .nothing => {}, .register_awaiter => |awaiter| { - const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.prev_context)); + const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber); }, - .io_uring_submit => |iou| { - _ = iou.flush_sq(); - // TODO: determine whether this return value should be used + .exit => { + @atomicStore(bool, &el.exiting, true, .unordered); + @atomicStore(usize, &el.queue_len, std.math.maxInt(usize), .release); + _ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(usize), std.math.maxInt(i32), switch (@bitSizeOf(usize)) { + 8 => std.os.linux.FUTEX2.SIZE_U8, + 16 => std.os.linux.FUTEX2.SIZE_U16, + 32 => std.os.linux.FUTEX2.SIZE_U32, + 64 => std.os.linux.FUTEX2.SIZE_U64, + else => @compileError("unsupported @sizeOf(usize)"), + } | std.os.linux.FUTEX2.PRIVATE); // TODO: use io_uring }, } } @@ -289,7 +319,7 @@ const Context = switch (builtin.cpu.arch) { }; inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage { - return switch (builtin.cpu.arch) { + return @fieldParentPtr("contexts", switch (builtin.cpu.arch) { .x86_64 => asm volatile ( \\ movq 0(%%rsi), %%rax \\ movq 8(%%rsi), %%rcx @@ -301,8 +331,8 @@ inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage { \\ movq 8(%%rcx), %%rbp \\ jmpq *16(%%rcx) \\0: - : [received_message] "={rsi}" (-> *const SwitchMessage), - : [message_to_send] "{rsi}" (message), + : [received_message] "={rsi}" (-> *const @FieldType(SwitchMessage, "contexts")), + : [message_to_send] "{rsi}" (&message.contexts), : "rax", "rcx", "rdx", "rbx", "rdi", // "r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15", // "mm0", "mm1", "mm2", "mm3", "mm4", "mm5", "mm6", "mm7", // @@ -313,7 +343,7 @@ inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage { "fpsr", "fpcr", "mxcsr", "rflags", "dirflag", "memory" ), else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), - }; + }); } fn mainIdleEntry() callconv(.naked) void { @@ -401,7 +431,7 @@ const AsyncClosure = struct { std.log.debug("{*} performing async", .{closure.fiber}); closure.start(closure.contextPointer(), closure.fiber.resultPointer()); const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel); - closure.event_loop.yield(awaiter, null); + closure.event_loop.yield(awaiter, .nothing); unreachable; // switched to dead fiber } }; @@ -409,17 +439,93 @@ const AsyncClosure = struct { pub fn @"await"(userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, result: []u8) void { const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); const future_fiber: *Fiber = @alignCast(@ptrCast(any_future)); - if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, &future_fiber.awaiter); + if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter }); @memcpy(result, future_fiber.resultPointer()); event_loop.recycle(future_fiber); } pub fn createFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, flags: std.fs.File.CreateFlags) std.fs.File.OpenError!std.fs.File { - _ = userdata; - _ = dir; - _ = sub_path; - _ = flags; - @panic("TODO"); + const el: *EventLoop = @ptrCast(@alignCast(userdata)); + + const posix = std.posix; + const sub_path_c = try posix.toPosixPath(sub_path); + + var os_flags: posix.O = .{ + .ACCMODE = if (flags.read) .RDWR else .WRONLY, + .CREAT = true, + .TRUNC = flags.truncate, + .EXCL = flags.exclusive, + }; + if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true; + if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true; + + // Use the O locking flags if the os supports them to acquire the lock + // atomically. Note that the NONBLOCK flag is removed after the openat() + // call is successful. + const has_flock_open_flags = @hasField(posix.O, "EXLOCK"); + if (has_flock_open_flags) switch (flags.lock) { + .none => {}, + .shared => { + os_flags.SHLOCK = true; + os_flags.NONBLOCK = flags.lock_nonblocking; + }, + .exclusive => { + os_flags.EXLOCK = true; + os_flags.NONBLOCK = flags.lock_nonblocking; + }, + }; + const have_flock = @TypeOf(posix.system.flock) != void; + + if (have_flock and !has_flock_open_flags and flags.lock != .none) { + @panic("TODO"); + } + + if (has_flock_open_flags and flags.lock_nonblocking) { + @panic("TODO"); + } + + const thread: *Thread = &el.threads.items[thread_index]; + const iou = &thread.io_uring; + const sqe = getSqe(iou); + const fiber = thread.currentFiber(); + + sqe.prep_openat(dir.fd, &sub_path_c, os_flags, flags.mode); + sqe.user_data = @intFromPtr(fiber); + + el.yield(null, .nothing); + + const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)])); + const rc = result.*; + switch (errno(rc)) { + .SUCCESS => return .{ .handle = rc }, + .INTR => @panic("TODO is this reachable?"), + .CANCELED => @panic("TODO figure out how this error code fits into things"), + + .FAULT => unreachable, + .INVAL => return error.BadPathName, + .BADF => unreachable, + .ACCES => return error.AccessDenied, + .FBIG => return error.FileTooBig, + .OVERFLOW => return error.FileTooBig, + .ISDIR => return error.IsDir, + .LOOP => return error.SymLinkLoop, + .MFILE => return error.ProcessFdQuotaExceeded, + .NAMETOOLONG => return error.NameTooLong, + .NFILE => return error.SystemFdQuotaExceeded, + .NODEV => return error.NoDevice, + .NOENT => return error.FileNotFound, + .NOMEM => return error.SystemResources, + .NOSPC => return error.NoSpaceLeft, + .NOTDIR => return error.NotDir, + .PERM => return error.PermissionDenied, + .EXIST => return error.PathAlreadyExists, + .BUSY => return error.DeviceBusy, + .OPNOTSUPP => return error.FileLocksNotSupported, + .AGAIN => return error.WouldBlock, + .TXTBSY => return error.FileBusy, + .NXIO => return error.NoDevice, + else => |err| return posix.unexpectedErrno(err), + } } pub fn openFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, flags: std.fs.File.OpenFlags) std.fs.File.OpenError!std.fs.File { @@ -476,7 +582,7 @@ pub fn openFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, fl sqe.prep_openat(dir.fd, &sub_path_c, os_flags, 0); sqe.user_data = @intFromPtr(fiber); - el.yield(null, .{ .io_uring_submit = iou }); + el.yield(null, .nothing); const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)])); const rc = result.*; @@ -510,8 +616,6 @@ pub fn openFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, fl .NXIO => return error.NoDevice, else => |err| return posix.unexpectedErrno(err), } - - return .{ .handle = result.* }; } fn errno(signed: i32) std.posix.E { @@ -524,21 +628,109 @@ fn getSqe(iou: *IoUring) *std.os.linux.io_uring_sqe { } pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void { - _ = userdata; - _ = file; - @panic("TODO"); + const el: *EventLoop = @ptrCast(@alignCast(userdata)); + + const posix = std.posix; + + const thread: *Thread = &el.threads.items[thread_index]; + const iou = &thread.io_uring; + const sqe = getSqe(iou); + const fiber = thread.currentFiber(); + + sqe.prep_close(file.handle); + sqe.user_data = @intFromPtr(fiber); + + el.yield(null, .nothing); + + const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)])); + const rc = result.*; + switch (errno(rc)) { + .SUCCESS => return, + .INTR => @panic("TODO is this reachable?"), + .CANCELED => @panic("TODO figure out how this error code fits into things"), + + .BADF => unreachable, // Always a race condition. + else => return, + } } pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) std.fs.File.ReadError!usize { - _ = userdata; - _ = file; - _ = buffer; - @panic("TODO"); + const el: *EventLoop = @ptrCast(@alignCast(userdata)); + + const posix = std.posix; + + const thread: *Thread = &el.threads.items[thread_index]; + const iou = &thread.io_uring; + const sqe = getSqe(iou); + const fiber = thread.currentFiber(); + + sqe.prep_read(file.handle, buffer, std.math.maxInt(u64)); + sqe.user_data = @intFromPtr(fiber); + + el.yield(null, .nothing); + + const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)])); + const rc = result.*; + switch (errno(rc)) { + .SUCCESS => return @as(u32, @bitCast(rc)), + .INTR => @panic("TODO is this reachable?"), + .CANCELED => @panic("TODO figure out how this error code fits into things"), + + .INVAL => unreachable, + .FAULT => unreachable, + .NOENT => return error.ProcessNotFound, + .AGAIN => return error.WouldBlock, + .BADF => return error.NotOpenForReading, // Can be a race condition. + .IO => return error.InputOutput, + .ISDIR => return error.IsDir, + .NOBUFS => return error.SystemResources, + .NOMEM => return error.SystemResources, + .NOTCONN => return error.SocketNotConnected, + .CONNRESET => return error.ConnectionResetByPeer, + .TIMEDOUT => return error.ConnectionTimedOut, + else => |err| return posix.unexpectedErrno(err), + } } pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) std.fs.File.WriteError!usize { - _ = userdata; - _ = file; - _ = buffer; - @panic("TODO"); + const el: *EventLoop = @ptrCast(@alignCast(userdata)); + + const posix = std.posix; + + const thread: *Thread = &el.threads.items[thread_index]; + const iou = &thread.io_uring; + const sqe = getSqe(iou); + const fiber = thread.currentFiber(); + + sqe.prep_write(file.handle, buffer, std.math.maxInt(u64)); + sqe.user_data = @intFromPtr(fiber); + + el.yield(null, .nothing); + + const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)])); + const rc = result.*; + switch (errno(rc)) { + .SUCCESS => return @as(u32, @bitCast(rc)), + .INTR => @panic("TODO is this reachable?"), + .CANCELED => @panic("TODO figure out how this error code fits into things"), + + .INVAL => return error.InvalidArgument, + .FAULT => unreachable, + .NOENT => return error.ProcessNotFound, + .AGAIN => return error.WouldBlock, + .BADF => return error.NotOpenForWriting, // can be a race condition. + .DESTADDRREQ => unreachable, // `connect` was never called. + .DQUOT => return error.DiskQuota, + .FBIG => return error.FileTooBig, + .IO => return error.InputOutput, + .NOSPC => return error.NoSpaceLeft, + .ACCES => return error.AccessDenied, + .PERM => return error.PermissionDenied, + .PIPE => return error.BrokenPipe, + .CONNRESET => return error.ConnectionResetByPeer, + .BUSY => return error.DeviceBusy, + .NXIO => return error.NoDevice, + .MSGSIZE => return error.MessageTooBig, + else => |err| return posix.unexpectedErrno(err), + } }