const Threaded = @This(); const builtin = @import("builtin"); const native_os = builtin.os.tag; const is_windows = native_os == .windows; const windows = std.os.windows; const std = @import("../std.zig"); const Io = std.Io; const net = std.Io.net; const HostName = std.Io.net.HostName; const IpAddress = std.Io.net.IpAddress; const Allocator = std.mem.Allocator; const assert = std.debug.assert; const posix = std.posix; /// Thread-safe. allocator: Allocator, mutex: std.Thread.Mutex = .{}, cond: std.Thread.Condition = .{}, run_queue: std.SinglyLinkedList = .{}, join_requested: bool = false, threads: std.ArrayListUnmanaged(std.Thread), stack_size: usize, cpu_count: std.Thread.CpuCountError!usize, concurrent_count: usize, threadlocal var current_closure: ?*Closure = null; const max_iovecs_len = 8; const splat_buffer_size = 64; comptime { assert(max_iovecs_len <= posix.IOV_MAX); } const Closure = struct { start: Start, node: std.SinglyLinkedList.Node = .{}, cancel_tid: std.Thread.Id, /// Whether this task bumps minimum number of threads in the pool. is_concurrent: bool, const Start = *const fn (*Closure) void; const canceling_tid: std.Thread.Id = switch (@typeInfo(std.Thread.Id)) { .int => |int_info| switch (int_info.signedness) { .signed => -1, .unsigned => std.math.maxInt(std.Thread.Id), }, .pointer => @ptrFromInt(std.math.maxInt(usize)), else => @compileError("unsupported std.Thread.Id: " ++ @typeName(std.Thread.Id)), }; fn requestCancel(closure: *Closure) void { switch (@atomicRmw(std.Thread.Id, &closure.cancel_tid, .Xchg, canceling_tid, .acq_rel)) { 0, canceling_tid => {}, else => |tid| switch (builtin.os.tag) { .linux => _ = std.os.linux.tgkill(std.os.linux.getpid(), @bitCast(tid), posix.SIG.IO), else => {}, }, } } }; pub const InitError = std.Thread.CpuCountError || Allocator.Error; /// Related: /// * `init_single_threaded` pub fn init( /// Must be threadsafe. Only used for the following functions: /// * `Io.VTable.async` /// * `Io.VTable.concurrent` /// * `Io.VTable.groupAsync` /// If these functions are avoided, then `Allocator.failing` may be passed /// here. gpa: Allocator, ) Threaded { var t: Threaded = .{ .allocator = gpa, .threads = .empty, .stack_size = std.Thread.SpawnConfig.default_stack_size, .cpu_count = std.Thread.getCpuCount(), .concurrent_count = 0, }; if (t.cpu_count) |n| { t.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {}; } else |_| {} return t; } /// Statically initialize such that any call to the following functions will /// fail with `error.OutOfMemory`: /// * `Io.VTable.async` /// * `Io.VTable.concurrent` /// * `Io.VTable.groupAsync` /// When initialized this way, `deinit` is safe, but unnecessary to call. pub const init_single_threaded: Threaded = .{ .allocator = .failing, .threads = .empty, .stack_size = std.Thread.SpawnConfig.default_stack_size, .cpu_count = 1, .concurrent_count = 0, }; pub fn deinit(t: *Threaded) void { const gpa = t.allocator; t.join(); t.threads.deinit(gpa); t.* = undefined; } fn join(t: *Threaded) void { if (builtin.single_threaded) return; { t.mutex.lock(); defer t.mutex.unlock(); t.join_requested = true; } t.cond.broadcast(); for (t.threads.items) |thread| thread.join(); } fn worker(t: *Threaded) void { t.mutex.lock(); defer t.mutex.unlock(); while (true) { while (t.run_queue.popFirst()) |closure_node| { t.mutex.unlock(); const closure: *Closure = @fieldParentPtr("node", closure_node); const is_concurrent = closure.is_concurrent; closure.start(closure); t.mutex.lock(); if (is_concurrent) { t.concurrent_count -= 1; } } if (t.join_requested) break; t.cond.wait(&t.mutex); } } pub fn io(t: *Threaded) Io { return .{ .userdata = t, .vtable = &.{ .async = async, .concurrent = concurrent, .await = await, .cancel = cancel, .cancelRequested = cancelRequested, .select = select, .groupAsync = groupAsync, .groupWait = groupWait, .groupWaitUncancelable = groupWaitUncancelable, .groupCancel = groupCancel, .mutexLock = mutexLock, .mutexLockUncancelable = mutexLockUncancelable, .mutexUnlock = mutexUnlock, .conditionWait = conditionWait, .conditionWaitUncancelable = conditionWaitUncancelable, .conditionWake = conditionWake, .dirMake = switch (builtin.os.tag) { .windows => @panic("TODO"), .wasi => @panic("TODO"), else => dirMakePosix, }, .dirStat = dirStat, .dirStatPath = switch (builtin.os.tag) { .linux => dirStatPathLinux, .windows => @panic("TODO"), .wasi => @panic("TODO"), else => dirStatPathPosix, }, .fileStat = switch (builtin.os.tag) { .linux => fileStatLinux, .windows => fileStatWindows, .wasi => fileStatWasi, else => fileStatPosix, }, .dirCreateFile = switch (builtin.os.tag) { .windows => @panic("TODO"), .wasi => @panic("TODO"), else => dirCreateFilePosix, }, .dirOpenFile = dirOpenFile, .fileClose = fileClose, .fileWriteStreaming = fileWriteStreaming, .fileWritePositional = fileWritePositional, .fileReadStreaming = fileReadStreaming, .fileReadPositional = fileReadPositional, .fileSeekBy = fileSeekBy, .fileSeekTo = fileSeekTo, .now = switch (builtin.os.tag) { .windows => nowWindows, .wasi => nowWasi, else => nowPosix, }, .sleep = switch (builtin.os.tag) { .windows => sleepWindows, .wasi => sleepWasi, .linux => sleepLinux, else => sleepPosix, }, .netListenIp = switch (builtin.os.tag) { .windows => @panic("TODO"), else => netListenIpPosix, }, .netListenUnix = netListenUnix, .netAccept = switch (builtin.os.tag) { .windows => @panic("TODO"), else => netAcceptPosix, }, .netBindIp = switch (builtin.os.tag) { .windows => @panic("TODO"), else => netBindIpPosix, }, .netConnectIp = switch (builtin.os.tag) { .windows => @panic("TODO"), else => netConnectIpPosix, }, .netConnectUnix = netConnectUnix, .netClose = netClose, .netRead = switch (builtin.os.tag) { .windows => @panic("TODO"), else => netReadPosix, }, .netWrite = switch (builtin.os.tag) { .windows => @panic("TODO"), else => netWritePosix, }, .netSend = netSend, .netReceive = netReceive, .netInterfaceNameResolve = netInterfaceNameResolve, .netInterfaceName = netInterfaceName, .netLookup = netLookup, }, }; } /// Trailing data: /// 1. context /// 2. result const AsyncClosure = struct { closure: Closure, func: *const fn (context: *anyopaque, result: *anyopaque) void, reset_event: ResetEvent, select_condition: ?*ResetEvent, context_alignment: std.mem.Alignment, result_offset: usize, /// Whether the task has a return type with nonzero bits. has_result: bool, const done_reset_event: *ResetEvent = @ptrFromInt(@alignOf(ResetEvent)); fn start(closure: *Closure) void { const ac: *AsyncClosure = @alignCast(@fieldParentPtr("closure", closure)); const tid = std.Thread.getCurrentId(); if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, 0, tid, .acq_rel, .acquire)) |cancel_tid| { assert(cancel_tid == Closure.canceling_tid); // Even though we already know the task is canceled, we must still // run the closure in order to make the return value valid - that // is, unless the result is zero bytes! if (!ac.has_result) { ac.reset_event.set(); return; } } current_closure = closure; ac.func(ac.contextPointer(), ac.resultPointer()); current_closure = null; // In case a cancel happens after successful task completion, prevents // signal from being delivered to the thread in `requestCancel`. if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, tid, 0, .acq_rel, .acquire)) |cancel_tid| { assert(cancel_tid == Closure.canceling_tid); } if (@atomicRmw(?*ResetEvent, &ac.select_condition, .Xchg, done_reset_event, .release)) |select_reset| { assert(select_reset != done_reset_event); select_reset.set(); } ac.reset_event.set(); } fn resultPointer(ac: *AsyncClosure) [*]u8 { const base: [*]u8 = @ptrCast(ac); return base + ac.result_offset; } fn contextPointer(ac: *AsyncClosure) [*]u8 { const base: [*]u8 = @ptrCast(ac); return base + ac.context_alignment.forward(@sizeOf(AsyncClosure)); } fn waitAndFree(ac: *AsyncClosure, gpa: Allocator, result: []u8) void { ac.reset_event.waitUncancelable(); @memcpy(result, ac.resultPointer()[0..result.len]); free(ac, gpa, result.len); } fn free(ac: *AsyncClosure, gpa: Allocator, result_len: usize) void { if (!ac.has_result) assert(result_len == 0); const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(ac); gpa.free(base[0 .. ac.result_offset + result_len]); } }; fn async( userdata: ?*anyopaque, result: []u8, result_alignment: std.mem.Alignment, context: []const u8, context_alignment: std.mem.Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*Io.AnyFuture { if (builtin.single_threaded) { start(context.ptr, result.ptr); return null; } const t: *Threaded = @ptrCast(@alignCast(userdata)); const cpu_count = t.cpu_count catch { return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch { start(context.ptr, result.ptr); return null; }; }; const gpa = t.allocator; const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); const result_offset = result_alignment.forward(context_offset + context.len); const n = result_offset + result.len; const ac: *AsyncClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch { start(context.ptr, result.ptr); return null; })); ac.* = .{ .closure = .{ .cancel_tid = 0, .start = AsyncClosure.start, .is_concurrent = false, }, .func = start, .context_alignment = context_alignment, .result_offset = result_offset, .has_result = result.len != 0, .reset_event = .unset, .select_condition = null, }; @memcpy(ac.contextPointer()[0..context.len], context); t.mutex.lock(); const thread_capacity = cpu_count - 1 + t.concurrent_count; t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { t.mutex.unlock(); ac.free(gpa, result.len); start(context.ptr, result.ptr); return null; }; t.run_queue.prepend(&ac.closure.node); if (t.threads.items.len < thread_capacity) { const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { if (t.threads.items.len == 0) { assert(t.run_queue.popFirst() == &ac.closure.node); t.mutex.unlock(); ac.free(gpa, result.len); start(context.ptr, result.ptr); return null; } // Rely on other workers to do it. t.mutex.unlock(); t.cond.signal(); return @ptrCast(ac); }; t.threads.appendAssumeCapacity(thread); } t.mutex.unlock(); t.cond.signal(); return @ptrCast(ac); } fn concurrent( userdata: ?*anyopaque, result_len: usize, result_alignment: std.mem.Alignment, context: []const u8, context_alignment: std.mem.Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) error{OutOfMemory}!*Io.AnyFuture { if (builtin.single_threaded) unreachable; const t: *Threaded = @ptrCast(@alignCast(userdata)); const cpu_count = t.cpu_count catch 1; const gpa = t.allocator; const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); const result_offset = result_alignment.forward(context_offset + context.len); const n = result_offset + result_len; const ac: *AsyncClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(AsyncClosure), n))); ac.* = .{ .closure = .{ .cancel_tid = 0, .start = AsyncClosure.start, .is_concurrent = true, }, .func = start, .context_alignment = context_alignment, .result_offset = result_offset, .has_result = result_len != 0, .reset_event = .unset, .select_condition = null, }; @memcpy(ac.contextPointer()[0..context.len], context); t.mutex.lock(); t.concurrent_count += 1; const thread_capacity = cpu_count - 1 + t.concurrent_count; t.threads.ensureTotalCapacity(gpa, thread_capacity) catch { t.mutex.unlock(); ac.free(gpa, result_len); return error.OutOfMemory; }; t.run_queue.prepend(&ac.closure.node); if (t.threads.items.len < thread_capacity) { const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { assert(t.run_queue.popFirst() == &ac.closure.node); t.mutex.unlock(); ac.free(gpa, result_len); return error.OutOfMemory; }; t.threads.appendAssumeCapacity(thread); } t.mutex.unlock(); t.cond.signal(); return @ptrCast(ac); } const GroupClosure = struct { closure: Closure, t: *Threaded, group: *Io.Group, /// Points to sibling `GroupClosure`. Used for walking the group to cancel all. node: std.SinglyLinkedList.Node, func: *const fn (*Io.Group, context: *anyopaque) void, context_alignment: std.mem.Alignment, context_len: usize, fn start(closure: *Closure) void { const gc: *GroupClosure = @alignCast(@fieldParentPtr("closure", closure)); const tid = std.Thread.getCurrentId(); const group = gc.group; const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); const reset_event: *ResetEvent = @ptrCast(&group.context); if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, 0, tid, .acq_rel, .acquire)) |cancel_tid| { assert(cancel_tid == Closure.canceling_tid); // We already know the task is canceled before running the callback. Since all closures // in a Group have void return type, we can return early. syncFinish(group_state, reset_event); return; } current_closure = closure; gc.func(group, gc.contextPointer()); current_closure = null; // In case a cancel happens after successful task completion, prevents // signal from being delivered to the thread in `requestCancel`. if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, tid, 0, .acq_rel, .acquire)) |cancel_tid| { assert(cancel_tid == Closure.canceling_tid); } syncFinish(group_state, reset_event); } fn free(gc: *GroupClosure, gpa: Allocator) void { const base: [*]align(@alignOf(GroupClosure)) u8 = @ptrCast(gc); gpa.free(base[0..contextEnd(gc.context_alignment, gc.context_len)]); } fn contextOffset(context_alignment: std.mem.Alignment) usize { return context_alignment.forward(@sizeOf(GroupClosure)); } fn contextEnd(context_alignment: std.mem.Alignment, context_len: usize) usize { return contextOffset(context_alignment) + context_len; } fn contextPointer(gc: *GroupClosure) [*]u8 { const base: [*]u8 = @ptrCast(gc); return base + contextOffset(gc.context_alignment); } const sync_is_waiting: usize = 1 << 0; const sync_one_pending: usize = 1 << 1; fn syncStart(state: *std.atomic.Value(usize)) void { const prev_state = state.fetchAdd(sync_one_pending, .monotonic); assert((prev_state / sync_one_pending) < (std.math.maxInt(usize) / sync_one_pending)); } fn syncFinish(state: *std.atomic.Value(usize), event: *ResetEvent) void { const prev_state = state.fetchSub(sync_one_pending, .acq_rel); assert((prev_state / sync_one_pending) > 0); if (prev_state == (sync_one_pending | sync_is_waiting)) event.set(); } fn syncWait(t: *Threaded, state: *std.atomic.Value(usize), event: *ResetEvent) Io.Cancelable!void { const prev_state = state.fetchAdd(sync_is_waiting, .acquire); assert(prev_state & sync_is_waiting == 0); if ((prev_state / sync_one_pending) > 0) try event.wait(t); } fn syncWaitUncancelable(state: *std.atomic.Value(usize), event: *ResetEvent) void { const prev_state = state.fetchAdd(sync_is_waiting, .acquire); assert(prev_state & sync_is_waiting == 0); if ((prev_state / sync_one_pending) > 0) event.waitUncancelable(); } }; fn groupAsync( userdata: ?*anyopaque, group: *Io.Group, context: []const u8, context_alignment: std.mem.Alignment, start: *const fn (*Io.Group, context: *const anyopaque) void, ) void { if (builtin.single_threaded) return start(context.ptr); const t: *Threaded = @ptrCast(@alignCast(userdata)); const cpu_count = t.cpu_count catch 1; const gpa = t.allocator; const n = GroupClosure.contextEnd(context_alignment, context.len); const gc: *GroupClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(GroupClosure), n) catch { return start(group, context.ptr); })); gc.* = .{ .closure = .{ .cancel_tid = 0, .start = GroupClosure.start, .is_concurrent = false, }, .t = t, .group = group, .node = undefined, .func = start, .context_alignment = context_alignment, .context_len = context.len, }; @memcpy(gc.contextPointer()[0..context.len], context); t.mutex.lock(); // Append to the group linked list inside the mutex to make `Io.Group.async` thread-safe. gc.node = .{ .next = @ptrCast(@alignCast(group.token)) }; group.token = &gc.node; const thread_capacity = cpu_count - 1 + t.concurrent_count; t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { t.mutex.unlock(); gc.free(gpa); return start(group, context.ptr); }; t.run_queue.prepend(&gc.closure.node); if (t.threads.items.len < thread_capacity) { const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { assert(t.run_queue.popFirst() == &gc.closure.node); t.mutex.unlock(); gc.free(gpa); return start(group, context.ptr); }; t.threads.appendAssumeCapacity(thread); } // This needs to be done before unlocking the mutex to avoid a race with // the associated task finishing. const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); GroupClosure.syncStart(group_state); t.mutex.unlock(); t.cond.signal(); } fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) Io.Cancelable!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const gpa = t.allocator; if (builtin.single_threaded) return; const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); const reset_event: *ResetEvent = @ptrCast(&group.context); try GroupClosure.syncWait(t, group_state, reset_event); var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token)); while (true) { const gc: *GroupClosure = @fieldParentPtr("node", node); const node_next = node.next; gc.free(gpa); node = node_next orelse break; } } fn groupWaitUncancelable(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const gpa = t.allocator; if (builtin.single_threaded) return; const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); const reset_event: *ResetEvent = @ptrCast(&group.context); GroupClosure.syncWaitUncancelable(group_state, reset_event); var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token)); while (true) { const gc: *GroupClosure = @fieldParentPtr("node", node); const node_next = node.next; gc.free(gpa); node = node_next orelse break; } } fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const gpa = t.allocator; if (builtin.single_threaded) return; { var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token)); while (true) { const gc: *GroupClosure = @fieldParentPtr("node", node); gc.closure.requestCancel(); node = node.next orelse break; } } const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); const reset_event: *ResetEvent = @ptrCast(&group.context); GroupClosure.syncWaitUncancelable(group_state, reset_event); { var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token)); while (true) { const gc: *GroupClosure = @fieldParentPtr("node", node); const node_next = node.next; gc.free(gpa); node = node_next orelse break; } } } fn await( userdata: ?*anyopaque, any_future: *Io.AnyFuture, result: []u8, result_alignment: std.mem.Alignment, ) void { _ = result_alignment; const t: *Threaded = @ptrCast(@alignCast(userdata)); const closure: *AsyncClosure = @ptrCast(@alignCast(any_future)); closure.waitAndFree(t.allocator, result); } fn cancel( userdata: ?*anyopaque, any_future: *Io.AnyFuture, result: []u8, result_alignment: std.mem.Alignment, ) void { _ = result_alignment; const t: *Threaded = @ptrCast(@alignCast(userdata)); const ac: *AsyncClosure = @ptrCast(@alignCast(any_future)); ac.closure.requestCancel(); ac.waitAndFree(t.allocator, result); } fn cancelRequested(userdata: ?*anyopaque) bool { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; const closure = current_closure orelse return false; return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == Closure.canceling_tid; } fn checkCancel(t: *Threaded) error{Canceled}!void { if (cancelRequested(t)) return error.Canceled; } fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); if (prev_state == .contended) { try futexWait(t, @ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); } while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) { try futexWait(t, @ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); } } fn mutexLockUncancelable(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void { _ = userdata; if (prev_state == .contended) { futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); } while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) { futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); } } fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void { _ = userdata; _ = prev_state; if (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .unlocked, .release) == .contended) { futexWake(@ptrCast(&mutex.state), 1); } } fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const t_io = t.io(); comptime assert(@TypeOf(cond.state) == u64); const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state); const cond_state = &ints[0]; const cond_epoch = &ints[1]; const one_waiter = 1; const waiter_mask = 0xffff; const one_signal = 1 << 16; const signal_mask = 0xffff << 16; var epoch = cond_epoch.load(.acquire); var state = cond_state.fetchAdd(one_waiter, .monotonic); assert(state & waiter_mask != waiter_mask); state += one_waiter; mutex.unlock(t_io); defer mutex.lockUncancelable(t_io); while (true) { futexWaitUncancelable(cond_epoch, epoch); epoch = cond_epoch.load(.acquire); state = cond_state.load(.monotonic); while (state & signal_mask != 0) { const new_state = state - one_waiter - one_signal; state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return; } } } fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); comptime assert(@TypeOf(cond.state) == u64); const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state); const cond_state = &ints[0]; const cond_epoch = &ints[1]; const one_waiter = 1; const waiter_mask = 0xffff; const one_signal = 1 << 16; const signal_mask = 0xffff << 16; // Observe the epoch, then check the state again to see if we should wake up. // The epoch must be observed before we check the state or we could potentially miss a wake() and deadlock: // // - T1: s = LOAD(&state) // - T2: UPDATE(&s, signal) // - T2: UPDATE(&epoch, 1) + FUTEX_WAKE(&epoch) // - T1: e = LOAD(&epoch) (was reordered after the state load) // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed the state update + the epoch change) // // Acquire barrier to ensure the epoch load happens before the state load. var epoch = cond_epoch.load(.acquire); var state = cond_state.fetchAdd(one_waiter, .monotonic); assert(state & waiter_mask != waiter_mask); state += one_waiter; mutex.unlock(t.io()); defer mutex.lockUncancelable(t.io()); while (true) { try futexWait(t, cond_epoch, epoch); epoch = cond_epoch.load(.acquire); state = cond_state.load(.monotonic); // Try to wake up by consuming a signal and decremented the waiter we // added previously. Acquire barrier ensures code before the wake() // which added the signal happens before we decrement it and return. while (state & signal_mask != 0) { const new_state = state - one_waiter - one_signal; state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return; } } } fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; comptime assert(@TypeOf(cond.state) == u64); const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state); const cond_state = &ints[0]; const cond_epoch = &ints[1]; const one_waiter = 1; const waiter_mask = 0xffff; const one_signal = 1 << 16; const signal_mask = 0xffff << 16; var state = cond_state.load(.monotonic); while (true) { const waiters = (state & waiter_mask) / one_waiter; const signals = (state & signal_mask) / one_signal; // Reserves which waiters to wake up by incrementing the signals count. // Therefore, the signals count is always less than or equal to the // waiters count. We don't need to Futex.wake if there's nothing to // wake up or if other wake() threads have reserved to wake up the // current waiters. const wakeable = waiters - signals; if (wakeable == 0) { return; } const to_wake = switch (wake) { .one => 1, .all => wakeable, }; // Reserve the amount of waiters to wake by incrementing the signals // count. Release barrier ensures code before the wake() happens before // the signal it posted and consumed by the wait() threads. const new_state = state + (one_signal * to_wake); state = cond_state.cmpxchgWeak(state, new_state, .release, .monotonic) orelse { // Wake up the waiting threads we reserved above by changing the epoch value. // // A waiting thread could miss a wake up if *exactly* ((1<<32)-1) // wake()s happen between it observing the epoch and sleeping on // it. This is very unlikely due to how many precise amount of // Futex.wake() calls that would be between the waiting thread's // potential preemption. // // Release barrier ensures the signal being added to the state // happens before the epoch is changed. If not, the waiting thread // could potentially deadlock from missing both the state and epoch // change: // // - T2: UPDATE(&epoch, 1) (reordered before the state change) // - T1: e = LOAD(&epoch) // - T1: s = LOAD(&state) // - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch) // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change) _ = cond_epoch.fetchAdd(1, .release); futexWake(cond_epoch, to_wake); return; }; } } fn dirMakePosix(userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, mode: Io.Dir.Mode) Io.Dir.MakeError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); var path_buffer: [posix.PATH_MAX]u8 = undefined; const sub_path_posix = try pathToPosix(sub_path, &path_buffer); while (true) { try t.checkCancel(); switch (posix.errno(posix.system.mkdirat(dir.handle, sub_path_posix, mode))) { .SUCCESS => return, .INTR => continue, .ACCES => return error.AccessDenied, .BADF => |err| return errnoBug(err), .PERM => return error.PermissionDenied, .DQUOT => return error.DiskQuota, .EXIST => return error.PathAlreadyExists, .FAULT => |err| return errnoBug(err), .LOOP => return error.SymLinkLoop, .MLINK => return error.LinkQuotaExceeded, .NAMETOOLONG => return error.NameTooLong, .NOENT => return error.FileNotFound, .NOMEM => return error.SystemResources, .NOSPC => return error.NoSpaceLeft, .NOTDIR => return error.NotDir, .ROFS => return error.ReadOnlyFileSystem, // dragonfly: when dir_fd is unlinked from filesystem .NOTCONN => return error.FileNotFound, .ILSEQ => return error.BadPathName, else => |err| return posix.unexpectedErrno(err), } } } fn dirStat(userdata: ?*anyopaque, dir: Io.Dir) Io.Dir.StatError!Io.Dir.Stat { const t: *Threaded = @ptrCast(@alignCast(userdata)); try t.checkCancel(); _ = dir; @panic("TODO"); } fn dirStatPathLinux( userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, options: Io.Dir.StatPathOptions, ) Io.Dir.StatPathError!Io.File.Stat { const t: *Threaded = @ptrCast(@alignCast(userdata)); const linux = std.os.linux; var path_buffer: [posix.PATH_MAX]u8 = undefined; const sub_path_posix = try pathToPosix(sub_path, &path_buffer); const flags: u32 = linux.AT.NO_AUTOMOUNT | @as(u32, if (!options.follow_symlinks) linux.AT.SYMLINK_NOFOLLOW else 0); while (true) { try t.checkCancel(); var statx = std.mem.zeroes(linux.Statx); const rc = linux.statx( dir.handle, sub_path_posix, flags, linux.STATX_TYPE | linux.STATX_MODE | linux.STATX_ATIME | linux.STATX_MTIME | linux.STATX_CTIME, &statx, ); switch (linux.E.init(rc)) { .SUCCESS => return statFromLinux(&statx), .INTR => continue, .ACCES => return error.AccessDenied, .BADF => |err| return errnoBug(err), .FAULT => |err| return errnoBug(err), .INVAL => |err| return errnoBug(err), .LOOP => return error.SymLinkLoop, .NAMETOOLONG => |err| return errnoBug(err), // Handled by pathToPosix() above. .NOENT => return error.FileNotFound, .NOTDIR => return error.NotDir, .NOMEM => return error.SystemResources, else => |err| return posix.unexpectedErrno(err), } } } fn dirStatPathPosix( userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, options: Io.Dir.StatPathOptions, ) Io.Dir.StatPathError!Io.File.Stat { const t: *Threaded = @ptrCast(@alignCast(userdata)); var path_buffer: [posix.PATH_MAX]u8 = undefined; const sub_path_posix = try pathToPosix(sub_path, &path_buffer); const flags: u32 = if (!options.follow_symlinks) posix.AT.SYMLINK_NOFOLLOW else 0; while (true) { try t.checkCancel(); var stat = std.mem.zeroes(posix.Stat); switch (posix.errno(fstatat_sym(dir.handle, sub_path_posix, &stat, flags))) { .SUCCESS => return statFromPosix(&stat), .INTR => continue, .INVAL => |err| return errnoBug(err), .BADF => |err| return errnoBug(err), // Always a race condition. .NOMEM => return error.SystemResources, .ACCES => return error.AccessDenied, .PERM => return error.PermissionDenied, .FAULT => |err| return errnoBug(err), .NAMETOOLONG => return error.NameTooLong, .LOOP => return error.SymLinkLoop, .NOENT => return error.FileNotFound, .NOTDIR => return error.FileNotFound, .ILSEQ => return error.BadPathName, else => |err| return posix.unexpectedErrno(err), } } } fn fileStatPosix(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat { const t: *Threaded = @ptrCast(@alignCast(userdata)); if (posix.Stat == void) return error.Streaming; while (true) { try t.checkCancel(); var stat = std.mem.zeroes(posix.Stat); switch (posix.errno(fstat_sym(file.handle, &stat))) { .SUCCESS => return statFromPosix(&stat), .INTR => continue, .INVAL => |err| return errnoBug(err), .BADF => |err| return errnoBug(err), .NOMEM => return error.SystemResources, .ACCES => return error.AccessDenied, else => |err| return posix.unexpectedErrno(err), } } } fn fileStatLinux(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat { const t: *Threaded = @ptrCast(@alignCast(userdata)); const linux = std.os.linux; while (true) { try t.checkCancel(); var statx = std.mem.zeroes(linux.Statx); const rc = linux.statx( file.handle, "", linux.AT.EMPTY_PATH, linux.STATX_TYPE | linux.STATX_MODE | linux.STATX_ATIME | linux.STATX_MTIME | linux.STATX_CTIME, &statx, ); switch (linux.E.init(rc)) { .SUCCESS => return statFromLinux(&statx), .INTR => continue, .ACCES => |err| return errnoBug(err), .BADF => |err| return errnoBug(err), .FAULT => |err| return errnoBug(err), .INVAL => |err| return errnoBug(err), .LOOP => |err| return errnoBug(err), .NAMETOOLONG => |err| return errnoBug(err), .NOENT => |err| return errnoBug(err), .NOMEM => return error.SystemResources, .NOTDIR => |err| return errnoBug(err), else => |err| return posix.unexpectedErrno(err), } } } fn fileStatWindows(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat { const t: *Threaded = @ptrCast(@alignCast(userdata)); try t.checkCancel(); _ = file; @panic("TODO"); } fn fileStatWasi(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat { if (builtin.link_libc) return fileStatPosix(userdata, file); const t: *Threaded = @ptrCast(@alignCast(userdata)); while (true) { try t.checkCancel(); var stat: std.os.wasi.filestat_t = undefined; switch (std.os.wasi.fd_filestat_get(file.handle, &stat)) { .SUCCESS => return statFromWasi(&stat), .INTR => continue, .INVAL => |err| return errnoBug(err), .BADF => |err| return errnoBug(err), .NOMEM => return error.SystemResources, .ACCES => return error.AccessDenied, .NOTCAPABLE => return error.AccessDenied, else => |err| return posix.unexpectedErrno(err), } } } const have_flock = @TypeOf(posix.system.flock) != void; const openat_sym = if (posix.lfs64_abi) posix.system.openat64 else posix.system.openat; const fstat_sym = if (posix.lfs64_abi) posix.system.fstat64 else posix.system.fstat; const fstatat_sym = if (posix.lfs64_abi) posix.system.fstatat64 else posix.system.fstatat; const lseek_sym = if (posix.lfs64_abi) posix.system.lseek64 else posix.system.lseek; const preadv_sym = if (posix.lfs64_abi) posix.system.preadv64 else posix.system.preadv; fn dirCreateFilePosix( userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, flags: Io.File.CreateFlags, ) Io.File.OpenError!Io.File { const t: *Threaded = @ptrCast(@alignCast(userdata)); var path_buffer: [posix.PATH_MAX]u8 = undefined; const sub_path_posix = try pathToPosix(sub_path, &path_buffer); var os_flags: posix.O = .{ .ACCMODE = if (flags.read) .RDWR else .WRONLY, .CREAT = true, .TRUNC = flags.truncate, .EXCL = flags.exclusive, }; if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true; if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true; // Use the O locking flags if the os supports them to acquire the lock // atomically. Note that the NONBLOCK flag is removed after the openat() // call is successful. const has_flock_open_flags = @hasField(posix.O, "EXLOCK"); if (has_flock_open_flags) switch (flags.lock) { .none => {}, .shared => { os_flags.SHLOCK = true; os_flags.NONBLOCK = flags.lock_nonblocking; }, .exclusive => { os_flags.EXLOCK = true; os_flags.NONBLOCK = flags.lock_nonblocking; }, }; const fd: posix.fd_t = while (true) { try t.checkCancel(); const rc = openat_sym(dir.handle, sub_path_posix, os_flags, flags.mode); switch (posix.errno(rc)) { .SUCCESS => break @intCast(rc), .INTR => continue, .FAULT => |err| return errnoBug(err), .INVAL => return error.BadPathName, .BADF => |err| return errnoBug(err), .ACCES => return error.AccessDenied, .FBIG => return error.FileTooBig, .OVERFLOW => return error.FileTooBig, .ISDIR => return error.IsDir, .LOOP => return error.SymLinkLoop, .MFILE => return error.ProcessFdQuotaExceeded, .NAMETOOLONG => return error.NameTooLong, .NFILE => return error.SystemFdQuotaExceeded, .NODEV => return error.NoDevice, .NOENT => return error.FileNotFound, .SRCH => return error.ProcessNotFound, .NOMEM => return error.SystemResources, .NOSPC => return error.NoSpaceLeft, .NOTDIR => return error.NotDir, .PERM => return error.PermissionDenied, .EXIST => return error.PathAlreadyExists, .BUSY => return error.DeviceBusy, .OPNOTSUPP => return error.FileLocksNotSupported, .AGAIN => return error.WouldBlock, .TXTBSY => return error.FileBusy, .NXIO => return error.NoDevice, .ILSEQ => return error.BadPathName, else => |err| return posix.unexpectedErrno(err), } }; errdefer posix.close(fd); if (have_flock and !has_flock_open_flags and flags.lock != .none) { const lock_nonblocking: i32 = if (flags.lock_nonblocking) posix.LOCK.NB else 0; const lock_flags = switch (flags.lock) { .none => unreachable, .shared => posix.LOCK.SH | lock_nonblocking, .exclusive => posix.LOCK.EX | lock_nonblocking, }; while (true) { try t.checkCancel(); switch (posix.errno(posix.system.flock(fd, lock_flags))) { .SUCCESS => break, .INTR => continue, .BADF => |err| return errnoBug(err), .INVAL => |err| return errnoBug(err), // invalid parameters .NOLCK => return error.SystemResources, .AGAIN => return error.WouldBlock, .OPNOTSUPP => return error.FileLocksNotSupported, else => |err| return posix.unexpectedErrno(err), } } } if (has_flock_open_flags and flags.lock_nonblocking) { var fl_flags: usize = while (true) { try t.checkCancel(); const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0)); switch (posix.errno(rc)) { .SUCCESS => break @intCast(rc), .INTR => continue, else => |err| return posix.unexpectedErrno(err), } }; fl_flags &= ~@as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK")); while (true) { try t.checkCancel(); switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFL, fl_flags))) { .SUCCESS => break, .INTR => continue, else => |err| return posix.unexpectedErrno(err), } } } return .{ .handle = fd }; } fn dirOpenFile( userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, flags: Io.File.OpenFlags, ) Io.File.OpenError!Io.File { const t: *Threaded = @ptrCast(@alignCast(userdata)); var path_buffer: [posix.PATH_MAX]u8 = undefined; const sub_path_posix = try pathToPosix(sub_path, &path_buffer); var os_flags: posix.O = switch (native_os) { .wasi => .{ .read = flags.mode != .write_only, .write = flags.mode != .read_only, }, else => .{ .ACCMODE = switch (flags.mode) { .read_only => .RDONLY, .write_only => .WRONLY, .read_write => .RDWR, }, }, }; if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true; if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true; if (@hasField(posix.O, "NOCTTY")) os_flags.NOCTTY = !flags.allow_ctty; // Use the O locking flags if the os supports them to acquire the lock // atomically. const has_flock_open_flags = @hasField(posix.O, "EXLOCK"); if (has_flock_open_flags) { // Note that the NONBLOCK flag is removed after the openat() call // is successful. switch (flags.lock) { .none => {}, .shared => { os_flags.SHLOCK = true; os_flags.NONBLOCK = flags.lock_nonblocking; }, .exclusive => { os_flags.EXLOCK = true; os_flags.NONBLOCK = flags.lock_nonblocking; }, } } const fd: posix.fd_t = while (true) { try t.checkCancel(); const rc = openat_sym(dir.handle, sub_path_posix, os_flags, @as(posix.mode_t, 0)); switch (posix.errno(rc)) { .SUCCESS => break @intCast(rc), .INTR => continue, .FAULT => |err| return errnoBug(err), .INVAL => return error.BadPathName, .BADF => |err| return errnoBug(err), .ACCES => return error.AccessDenied, .FBIG => return error.FileTooBig, .OVERFLOW => return error.FileTooBig, .ISDIR => return error.IsDir, .LOOP => return error.SymLinkLoop, .MFILE => return error.ProcessFdQuotaExceeded, .NAMETOOLONG => return error.NameTooLong, .NFILE => return error.SystemFdQuotaExceeded, .NODEV => return error.NoDevice, .NOENT => return error.FileNotFound, .SRCH => return error.ProcessNotFound, .NOMEM => return error.SystemResources, .NOSPC => return error.NoSpaceLeft, .NOTDIR => return error.NotDir, .PERM => return error.PermissionDenied, .EXIST => return error.PathAlreadyExists, .BUSY => return error.DeviceBusy, .OPNOTSUPP => return error.FileLocksNotSupported, .AGAIN => return error.WouldBlock, .TXTBSY => return error.FileBusy, .NXIO => return error.NoDevice, .ILSEQ => return error.BadPathName, else => |err| return posix.unexpectedErrno(err), } }; errdefer posix.close(fd); if (have_flock and !has_flock_open_flags and flags.lock != .none) { const lock_nonblocking: i32 = if (flags.lock_nonblocking) posix.LOCK.NB else 0; const lock_flags = switch (flags.lock) { .none => unreachable, .shared => posix.LOCK.SH | lock_nonblocking, .exclusive => posix.LOCK.EX | lock_nonblocking, }; while (true) { try t.checkCancel(); switch (posix.errno(posix.system.flock(fd, lock_flags))) { .SUCCESS => break, .INTR => continue, .BADF => |err| return errnoBug(err), .INVAL => |err| return errnoBug(err), // invalid parameters .NOLCK => return error.SystemResources, .AGAIN => return error.WouldBlock, .OPNOTSUPP => return error.FileLocksNotSupported, else => |err| return posix.unexpectedErrno(err), } } } if (has_flock_open_flags and flags.lock_nonblocking) { var fl_flags: usize = while (true) { try t.checkCancel(); const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0)); switch (posix.errno(rc)) { .SUCCESS => break @intCast(rc), .INTR => continue, else => |err| return posix.unexpectedErrno(err), } }; fl_flags &= ~@as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK")); while (true) { try t.checkCancel(); switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFL, fl_flags))) { .SUCCESS => break, .INTR => continue, else => |err| return posix.unexpectedErrno(err), } } } return .{ .handle = fd }; } fn fileClose(userdata: ?*anyopaque, file: Io.File) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; posix.close(file.handle); } fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File.ReadStreamingError!usize { const t: *Threaded = @ptrCast(@alignCast(userdata)); if (is_windows) { const DWORD = windows.DWORD; var index: usize = 0; var truncate: usize = 0; var total: usize = 0; while (index < data.len) { try t.checkCancel(); { const untruncated = data[index]; data[index] = untruncated[truncate..]; defer data[index] = untruncated; const buffer = data[index..]; const want_read_count: DWORD = @min(std.math.maxInt(DWORD), buffer.len); var n: DWORD = undefined; if (windows.kernel32.ReadFile(file.handle, buffer.ptr, want_read_count, &n, null) == 0) { switch (windows.GetLastError()) { .IO_PENDING => unreachable, .OPERATION_ABORTED => continue, .BROKEN_PIPE => return 0, .HANDLE_EOF => return 0, .NETNAME_DELETED => return error.ConnectionResetByPeer, .LOCK_VIOLATION => return error.LockViolation, .ACCESS_DENIED => return error.AccessDenied, .INVALID_HANDLE => return error.NotOpenForReading, else => |err| return windows.unexpectedError(err), } } total += n; truncate += n; } while (index < data.len and truncate >= data[index].len) { truncate -= data[index].len; index += 1; } } return total; } var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined; var i: usize = 0; for (data) |buf| { if (iovecs_buffer.len - i == 0) break; if (buf.len != 0) { iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len }; i += 1; } } const dest = iovecs_buffer[0..i]; assert(dest[0].len > 0); if (native_os == .wasi and !builtin.link_libc) while (true) { try t.checkCancel(); var nread: usize = undefined; switch (std.os.wasi.fd_read(file.handle, dest.ptr, dest.len, &nread)) { .SUCCESS => return nread, .INTR => continue, .INVAL => |err| return errnoBug(err), .FAULT => |err| return errnoBug(err), .BADF => |err| return errnoBug(err), .IO => return error.InputOutput, .ISDIR => return error.IsDir, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.Timeout, .NOTCAPABLE => return error.AccessDenied, else => |err| return posix.unexpectedErrno(err), } }; while (true) { try t.checkCancel(); const rc = posix.system.readv(file.handle, dest.ptr, @intCast(dest.len)); switch (posix.errno(rc)) { .SUCCESS => return @intCast(rc), .INTR => continue, .INVAL => |err| return errnoBug(err), .FAULT => |err| return errnoBug(err), .SRCH => return error.ProcessNotFound, .AGAIN => return error.WouldBlock, .BADF => return error.NotOpenForReading, // can be a race condition .IO => return error.InputOutput, .ISDIR => return error.IsDir, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.Timeout, else => |err| return posix.unexpectedErrno(err), } } } fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset: u64) Io.File.ReadPositionalError!usize { const t: *Threaded = @ptrCast(@alignCast(userdata)); if (is_windows) { const DWORD = windows.DWORD; const OVERLAPPED = windows.OVERLAPPED; var index: usize = 0; var truncate: usize = 0; var total: usize = 0; while (true) { try t.checkCancel(); { const untruncated = data[index]; data[index] = untruncated[truncate..]; defer data[index] = untruncated; const buffer = data[index..]; const want_read_count: DWORD = @min(std.math.maxInt(DWORD), buffer.len); var n: DWORD = undefined; var overlapped_data: OVERLAPPED = undefined; const overlapped: ?*OVERLAPPED = if (offset) |off| blk: { overlapped_data = .{ .Internal = 0, .InternalHigh = 0, .DUMMYUNIONNAME = .{ .DUMMYSTRUCTNAME = .{ .Offset = @as(u32, @truncate(off)), .OffsetHigh = @as(u32, @truncate(off >> 32)), }, }, .hEvent = null, }; break :blk &overlapped_data; } else null; if (windows.kernel32.ReadFile(file.handle, buffer.ptr, want_read_count, &n, overlapped) == 0) { switch (windows.GetLastError()) { .IO_PENDING => unreachable, .OPERATION_ABORTED => continue, .BROKEN_PIPE => return 0, .HANDLE_EOF => return 0, .NETNAME_DELETED => return error.ConnectionResetByPeer, .LOCK_VIOLATION => return error.LockViolation, .ACCESS_DENIED => return error.AccessDenied, .INVALID_HANDLE => return error.NotOpenForReading, else => |err| return windows.unexpectedError(err), } } total += n; truncate += n; } while (index < data.len and truncate >= data[index].len) { truncate -= data[index].len; index += 1; } } return total; } const have_pread_but_not_preadv = switch (native_os) { .windows, .haiku, .serenity => true, else => false, }; if (have_pread_but_not_preadv) { @compileError("TODO"); } var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined; var i: usize = 0; for (data) |buf| { if (iovecs_buffer.len - i == 0) break; if (buf.len != 0) { iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len }; i += 1; } } const dest = iovecs_buffer[0..i]; assert(dest[0].len > 0); if (native_os == .wasi and !builtin.link_libc) while (true) { try t.checkCancel(); var nread: usize = undefined; switch (std.os.wasi.fd_pread(file.handle, dest.ptr, dest.len, offset, &nread)) { .SUCCESS => return nread, .INTR => continue, .INVAL => |err| return errnoBug(err), .FAULT => |err| return errnoBug(err), .AGAIN => |err| return errnoBug(err), .BADF => return error.NotOpenForReading, // can be a race condition .IO => return error.InputOutput, .ISDIR => return error.IsDir, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.Timeout, .NXIO => return error.Unseekable, .SPIPE => return error.Unseekable, .OVERFLOW => return error.Unseekable, .NOTCAPABLE => return error.AccessDenied, else => |err| return posix.unexpectedErrno(err), } }; while (true) { try t.checkCancel(); const rc = preadv_sym(file.handle, dest.ptr, @intCast(dest.len), @bitCast(offset)); switch (posix.errno(rc)) { .SUCCESS => return @bitCast(rc), .INTR => continue, .INVAL => |err| return errnoBug(err), .FAULT => |err| return errnoBug(err), .SRCH => return error.ProcessNotFound, .AGAIN => return error.WouldBlock, .BADF => return error.NotOpenForReading, // can be a race condition .IO => return error.InputOutput, .ISDIR => return error.IsDir, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.Timeout, .NXIO => return error.Unseekable, .SPIPE => return error.Unseekable, .OVERFLOW => return error.Unseekable, else => |err| return posix.unexpectedErrno(err), } } } fn fileSeekBy(userdata: ?*anyopaque, file: Io.File, offset: i64) Io.File.SeekError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); try t.checkCancel(); _ = file; _ = offset; @panic("TODO"); } fn fileSeekTo(userdata: ?*anyopaque, file: Io.File, offset: u64) Io.File.SeekError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const fd = file.handle; if (native_os == .linux and !builtin.link_libc and @sizeOf(usize) == 4) while (true) { try t.checkCancel(); var result: u64 = undefined; switch (posix.errno(posix.system.llseek(fd, offset, &result, posix.SEEK.SET))) { .SUCCESS => return, .INTR => continue, .BADF => |err| return errnoBug(err), // Always a race condition. .INVAL => return error.Unseekable, .OVERFLOW => return error.Unseekable, .SPIPE => return error.Unseekable, .NXIO => return error.Unseekable, else => |err| return posix.unexpectedErrno(err), } }; if (native_os == .windows) { try t.checkCancel(); return windows.SetFilePointerEx_BEGIN(fd, offset); } if (native_os == .wasi and !builtin.link_libc) while (true) { try t.checkCancel(); var new_offset: std.os.wasi.filesize_t = undefined; switch (std.os.wasi.fd_seek(fd, @bitCast(offset), .SET, &new_offset)) { .SUCCESS => return, .INTR => continue, .BADF => |err| return errnoBug(err), // Always a race condition. .INVAL => return error.Unseekable, .OVERFLOW => return error.Unseekable, .SPIPE => return error.Unseekable, .NXIO => return error.Unseekable, .NOTCAPABLE => return error.AccessDenied, else => |err| return posix.unexpectedErrno(err), } }; if (posix.SEEK == void) return error.Unseekable; while (true) { try t.checkCancel(); switch (posix.errno(lseek_sym(fd, @bitCast(offset), posix.SEEK.SET))) { .SUCCESS => return, .INTR => continue, .BADF => |err| return errnoBug(err), // Always a race condition. .INVAL => return error.Unseekable, .OVERFLOW => return error.Unseekable, .SPIPE => return error.Unseekable, .NXIO => return error.Unseekable, else => |err| return posix.unexpectedErrno(err), } } } fn fileWritePositional( userdata: ?*anyopaque, file: Io.File, buffer: [][]const u8, offset: u64, ) Io.File.WritePositionalError!usize { const t: *Threaded = @ptrCast(@alignCast(userdata)); while (true) { try t.checkCancel(); _ = file; _ = buffer; _ = offset; @panic("TODO"); } } fn fileWriteStreaming(userdata: ?*anyopaque, file: Io.File, buffer: [][]const u8) Io.File.WriteStreamingError!usize { const t: *Threaded = @ptrCast(@alignCast(userdata)); while (true) { try t.checkCancel(); _ = file; _ = buffer; @panic("TODO"); } } fn nowPosix(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; const clock_id: posix.clockid_t = clockToPosix(clock); var tp: posix.timespec = undefined; switch (posix.errno(posix.system.clock_gettime(clock_id, &tp))) { .SUCCESS => return timestampFromPosix(&tp), .INVAL => return error.UnsupportedClock, else => |err| return posix.unexpectedErrno(err), } } fn nowWindows(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; switch (clock) { .realtime => { // RtlGetSystemTimePrecise() has a granularity of 100 nanoseconds // and uses the NTFS/Windows epoch, which is 1601-01-01. return .{ .nanoseconds = @as(i96, windows.ntdll.RtlGetSystemTimePrecise()) * 100 }; }, .monotonic, .uptime => { // QPC on windows doesn't fail on >= XP/2000 and includes time suspended. return .{ .timestamp = windows.QueryPerformanceCounter() }; }, .process_cputime_id, .thread_cputime_id, => return error.UnsupportedClock, } } fn nowWasi(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; var ns: std.os.wasi.timestamp_t = undefined; const err = std.os.wasi.clock_time_get(clockToWasi(clock), 1, &ns); if (err != .SUCCESS) return error.Unexpected; return ns; } fn sleepLinux(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const clock_id: posix.clockid_t = clockToPosix(switch (timeout) { .none => .awake, .duration => |d| d.clock, .deadline => |d| d.clock, }); const deadline_nanoseconds: i96 = switch (timeout) { .none => std.math.maxInt(i96), .duration => |duration| duration.raw.nanoseconds, .deadline => |deadline| deadline.raw.nanoseconds, }; var timespec: posix.timespec = timestampToPosix(deadline_nanoseconds); while (true) { try t.checkCancel(); switch (std.os.linux.E.init(std.os.linux.clock_nanosleep(clock_id, .{ .ABSTIME = switch (timeout) { .none, .duration => false, .deadline => true, } }, ×pec, ×pec))) { .SUCCESS => return, .INTR => continue, .INVAL => return error.UnsupportedClock, else => |err| return posix.unexpectedErrno(err), } } } fn sleepWindows(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); try t.checkCancel(); const ms = ms: { const duration_and_clock = (try timeout.toDurationFromNow(t.io())) orelse break :ms std.math.maxInt(windows.DWORD); break :ms std.math.lossyCast(windows.DWORD, duration_and_clock.duration.toMilliseconds()); }; windows.kernel32.Sleep(ms); } fn sleepWasi(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); try t.checkCancel(); const w = std.os.wasi; const clock: w.subscription_clock_t = if (try timeout.toDurationFromNow(t.io())) |d| .{ .id = clockToWasi(d.clock), .timeout = std.math.lossyCast(u64, d.duration.nanoseconds), .precision = 0, .flags = 0, } else .{ .id = .MONOTONIC, .timeout = std.math.maxInt(u64), .precision = 0, .flags = 0, }; const in: w.subscription_t = .{ .userdata = 0, .u = .{ .tag = .CLOCK, .u = .{ .clock = clock }, }, }; var event: w.event_t = undefined; var nevents: usize = undefined; _ = w.poll_oneoff(&in, &event, 1, &nevents); } fn sleepPosix(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const sec_type = @typeInfo(posix.timespec).@"struct".fields[0].type; const nsec_type = @typeInfo(posix.timespec).@"struct".fields[1].type; var timespec: posix.timespec = t: { const d = (try timeout.toDurationFromNow(t.io())) orelse break :t .{ .sec = std.math.maxInt(sec_type), .nsec = std.math.maxInt(nsec_type), }; break :t timestampToPosix(d.raw.toNanoseconds()); }; while (true) { try t.checkCancel(); switch (posix.errno(posix.system.nanosleep(×pec, ×pec))) { .INTR => continue, else => return, // This prong handles success as well as unexpected errors. } } } fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize { const t: *Threaded = @ptrCast(@alignCast(userdata)); var reset_event: ResetEvent = .unset; for (futures, 0..) |future, i| { const closure: *AsyncClosure = @ptrCast(@alignCast(future)); if (@atomicRmw(?*ResetEvent, &closure.select_condition, .Xchg, &reset_event, .seq_cst) == AsyncClosure.done_reset_event) { for (futures[0..i]) |cleanup_future| { const cleanup_closure: *AsyncClosure = @ptrCast(@alignCast(cleanup_future)); if (@atomicRmw(?*ResetEvent, &cleanup_closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) { cleanup_closure.reset_event.waitUncancelable(); // Ensure no reference to our stack-allocated reset_event. } } return i; } } try reset_event.wait(t); var result: ?usize = null; for (futures, 0..) |future, i| { const closure: *AsyncClosure = @ptrCast(@alignCast(future)); if (@atomicRmw(?*ResetEvent, &closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) { closure.reset_event.waitUncancelable(); // Ensure no reference to our stack-allocated reset_event. if (result == null) result = i; // In case multiple are ready, return first. } } return result.?; } fn netListenIpPosix( userdata: ?*anyopaque, address: IpAddress, options: IpAddress.ListenOptions, ) IpAddress.ListenError!net.Server { const t: *Threaded = @ptrCast(@alignCast(userdata)); const family = posixAddressFamily(&address); const socket_fd = try openSocketPosix(t, family, .{ .mode = options.mode, .protocol = options.protocol, }); errdefer posix.close(socket_fd); if (options.reuse_address) { try setSocketOption(t, socket_fd, posix.SOL.SOCKET, posix.SO.REUSEADDR, 1); if (@hasDecl(posix.SO, "REUSEPORT")) try setSocketOption(t, socket_fd, posix.SOL.SOCKET, posix.SO.REUSEPORT, 1); } var storage: PosixAddress = undefined; var addr_len = addressToPosix(&address, &storage); try posixBind(t, socket_fd, &storage.any, addr_len); while (true) { try t.checkCancel(); switch (posix.errno(posix.system.listen(socket_fd, options.kernel_backlog))) { .SUCCESS => break, .ADDRINUSE => return error.AddressInUse, .BADF => |err| return errnoBug(err), else => |err| return posix.unexpectedErrno(err), } } try posixGetSockName(t, socket_fd, &storage.any, &addr_len); return .{ .socket = .{ .handle = socket_fd, .address = addressFromPosix(&storage), }, }; } fn netListenUnix( userdata: ?*anyopaque, address: *const net.UnixAddress, options: net.UnixAddress.ListenOptions, ) net.UnixAddress.ListenError!net.Socket.Handle { if (!net.has_unix_sockets) return error.AddressFamilyUnsupported; const t: *Threaded = @ptrCast(@alignCast(userdata)); const socket_fd = openSocketPosix(t, posix.AF.UNIX, .{ .mode = .stream }) catch |err| switch (err) { error.ProtocolUnsupportedBySystem => return error.AddressFamilyUnsupported, error.ProtocolUnsupportedByAddressFamily => return error.AddressFamilyUnsupported, error.SocketModeUnsupported => return error.AddressFamilyUnsupported, error.OptionUnsupported => return error.Unexpected, else => |e| return e, }; errdefer posix.close(socket_fd); var storage: UnixAddress = undefined; const addr_len = addressUnixToPosix(address, &storage); try posixBindUnix(t, socket_fd, &storage.any, addr_len); while (true) { try t.checkCancel(); switch (posix.errno(posix.system.listen(socket_fd, options.kernel_backlog))) { .SUCCESS => break, .ADDRINUSE => return error.AddressInUse, .BADF => |err| return errnoBug(err), else => |err| return posix.unexpectedErrno(err), } } return socket_fd; } fn posixBindUnix(t: *Threaded, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void { while (true) { try t.checkCancel(); switch (posix.errno(posix.system.bind(fd, addr, addr_len))) { .SUCCESS => break, .INTR => continue, .ACCES => return error.AccessDenied, .ADDRINUSE => return error.AddressInUse, .AFNOSUPPORT => return error.AddressFamilyUnsupported, .ADDRNOTAVAIL => return error.AddressUnavailable, .NOMEM => return error.SystemResources, .LOOP => return error.SymLinkLoop, .NOENT => return error.FileNotFound, .NOTDIR => return error.NotDir, .ROFS => return error.ReadOnlyFileSystem, .PERM => return error.PermissionDenied, .BADF => |err| return errnoBug(err), // always a race condition if this error is returned .INVAL => |err| return errnoBug(err), // invalid parameters .NOTSOCK => |err| return errnoBug(err), // invalid `sockfd` .FAULT => |err| return errnoBug(err), // invalid `addr` pointer .NAMETOOLONG => |err| return errnoBug(err), else => |err| return posix.unexpectedErrno(err), } } } fn posixBind(t: *Threaded, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void { while (true) { try t.checkCancel(); switch (posix.errno(posix.system.bind(socket_fd, addr, addr_len))) { .SUCCESS => break, .INTR => continue, .ADDRINUSE => return error.AddressInUse, .BADF => |err| return errnoBug(err), // always a race condition if this error is returned .INVAL => |err| return errnoBug(err), // invalid parameters .NOTSOCK => |err| return errnoBug(err), // invalid `sockfd` .AFNOSUPPORT => return error.AddressFamilyUnsupported, .ADDRNOTAVAIL => return error.AddressUnavailable, .FAULT => |err| return errnoBug(err), // invalid `addr` pointer .NOMEM => return error.SystemResources, else => |err| return posix.unexpectedErrno(err), } } } fn posixConnect(t: *Threaded, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void { while (true) { try t.checkCancel(); switch (posix.errno(posix.system.connect(socket_fd, addr, addr_len))) { .SUCCESS => return, .INTR => continue, .ADDRNOTAVAIL => return error.AddressUnavailable, .AFNOSUPPORT => return error.AddressFamilyUnsupported, .AGAIN, .INPROGRESS => return error.WouldBlock, .ALREADY => return error.ConnectionPending, .BADF => |err| return errnoBug(err), .CONNREFUSED => return error.ConnectionRefused, .CONNRESET => return error.ConnectionResetByPeer, .FAULT => |err| return errnoBug(err), .ISCONN => |err| return errnoBug(err), .HOSTUNREACH => return error.HostUnreachable, .NETUNREACH => return error.NetworkUnreachable, .NOTSOCK => |err| return errnoBug(err), .PROTOTYPE => |err| return errnoBug(err), .TIMEDOUT => return error.Timeout, .CONNABORTED => |err| return errnoBug(err), .ACCES => return error.AccessDenied, .PERM => |err| return errnoBug(err), .NOENT => |err| return errnoBug(err), else => |err| return posix.unexpectedErrno(err), } } } fn posixConnectUnix(t: *Threaded, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void { while (true) { try t.checkCancel(); switch (posix.errno(posix.system.connect(fd, addr, addr_len))) { .SUCCESS => return, .INTR => continue, .AFNOSUPPORT => return error.AddressFamilyUnsupported, .AGAIN => return error.WouldBlock, .INPROGRESS => return error.WouldBlock, .ACCES => return error.AccessDenied, .LOOP => return error.SymLinkLoop, .NOENT => return error.FileNotFound, .NOTDIR => return error.NotDir, .ROFS => return error.ReadOnlyFileSystem, .PERM => return error.PermissionDenied, .BADF => |err| return errnoBug(err), .CONNABORTED => |err| return errnoBug(err), .FAULT => |err| return errnoBug(err), .ISCONN => |err| return errnoBug(err), .NOTSOCK => |err| return errnoBug(err), .PROTOTYPE => |err| return errnoBug(err), else => |err| return posix.unexpectedErrno(err), } } } fn posixGetSockName(t: *Threaded, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void { while (true) { try t.checkCancel(); switch (posix.errno(posix.system.getsockname(socket_fd, addr, addr_len))) { .SUCCESS => break, .INTR => continue, .BADF => |err| return errnoBug(err), // always a race condition .FAULT => |err| return errnoBug(err), .INVAL => |err| return errnoBug(err), // invalid parameters .NOTSOCK => |err| return errnoBug(err), // always a race condition .NOBUFS => return error.SystemResources, else => |err| return posix.unexpectedErrno(err), } } } fn setSocketOption(t: *Threaded, fd: posix.fd_t, level: i32, opt_name: u32, option: u32) !void { const o: []const u8 = @ptrCast(&option); while (true) { try t.checkCancel(); switch (posix.errno(posix.system.setsockopt(fd, level, opt_name, o.ptr, @intCast(o.len)))) { .SUCCESS => return, .INTR => continue, .BADF => |err| return errnoBug(err), // always a race condition .NOTSOCK => |err| return errnoBug(err), // always a race condition .INVAL => |err| return errnoBug(err), .FAULT => |err| return errnoBug(err), else => |err| return posix.unexpectedErrno(err), } } } fn netConnectIpPosix( userdata: ?*anyopaque, address: *const IpAddress, options: IpAddress.ConnectOptions, ) IpAddress.ConnectError!net.Stream { if (options.timeout != .none) @panic("TODO"); const t: *Threaded = @ptrCast(@alignCast(userdata)); const family = posixAddressFamily(address); const socket_fd = try openSocketPosix(t, family, .{ .mode = options.mode, .protocol = options.protocol, }); errdefer posix.close(socket_fd); var storage: PosixAddress = undefined; var addr_len = addressToPosix(address, &storage); try posixConnect(t, socket_fd, &storage.any, addr_len); try posixGetSockName(t, socket_fd, &storage.any, &addr_len); return .{ .socket = .{ .handle = socket_fd, .address = addressFromPosix(&storage), } }; } fn netConnectUnix( userdata: ?*anyopaque, address: *const net.UnixAddress, ) net.UnixAddress.ConnectError!net.Socket.Handle { if (!net.has_unix_sockets) return error.AddressFamilyUnsupported; const t: *Threaded = @ptrCast(@alignCast(userdata)); const socket_fd = openSocketPosix(t, posix.AF.UNIX, .{ .mode = .stream }) catch |err| switch (err) { error.OptionUnsupported => return error.Unexpected, else => |e| return e, }; errdefer posix.close(socket_fd); var storage: UnixAddress = undefined; const addr_len = addressUnixToPosix(address, &storage); try posixConnectUnix(t, socket_fd, &storage.any, addr_len); return socket_fd; } fn netBindIpPosix( userdata: ?*anyopaque, address: *const IpAddress, options: IpAddress.BindOptions, ) IpAddress.BindError!net.Socket { const t: *Threaded = @ptrCast(@alignCast(userdata)); const family = posixAddressFamily(address); const socket_fd = try openSocketPosix(t, family, options); errdefer posix.close(socket_fd); var storage: PosixAddress = undefined; var addr_len = addressToPosix(address, &storage); try posixBind(t, socket_fd, &storage.any, addr_len); try posixGetSockName(t, socket_fd, &storage.any, &addr_len); return .{ .handle = socket_fd, .address = addressFromPosix(&storage), }; } fn openSocketPosix( t: *Threaded, family: posix.sa_family_t, options: IpAddress.BindOptions, ) error{ AddressFamilyUnsupported, ProtocolUnsupportedBySystem, ProcessFdQuotaExceeded, SystemFdQuotaExceeded, SystemResources, ProtocolUnsupportedByAddressFamily, SocketModeUnsupported, OptionUnsupported, Unexpected, Canceled, }!posix.socket_t { const mode = posixSocketMode(options.mode); const protocol = posixProtocol(options.protocol); const socket_fd = while (true) { try t.checkCancel(); const flags: u32 = mode | if (socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC; const socket_rc = posix.system.socket(family, flags, protocol); switch (posix.errno(socket_rc)) { .SUCCESS => { const fd: posix.fd_t = @intCast(socket_rc); errdefer posix.close(fd); if (socket_flags_unsupported) while (true) { try t.checkCancel(); switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) { .SUCCESS => break, .INTR => continue, else => |err| return posix.unexpectedErrno(err), } }; break fd; }, .INTR => continue, .AFNOSUPPORT => return error.AddressFamilyUnsupported, .INVAL => return error.ProtocolUnsupportedBySystem, .MFILE => return error.ProcessFdQuotaExceeded, .NFILE => return error.SystemFdQuotaExceeded, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, .PROTONOSUPPORT => return error.ProtocolUnsupportedByAddressFamily, .PROTOTYPE => return error.SocketModeUnsupported, else => |err| return posix.unexpectedErrno(err), } }; errdefer posix.close(socket_fd); if (options.ip6_only) { if (posix.IPV6 == void) return error.OptionUnsupported; try setSocketOption(t, socket_fd, posix.IPPROTO.IPV6, posix.IPV6.V6ONLY, 0); } return socket_fd; } const socket_flags_unsupported = builtin.os.tag.isDarwin() or native_os == .haiku; // 💩💩 const have_accept4 = !socket_flags_unsupported; fn netAcceptPosix(userdata: ?*anyopaque, listen_fd: net.Socket.Handle) net.Server.AcceptError!net.Stream { const t: *Threaded = @ptrCast(@alignCast(userdata)); var storage: PosixAddress = undefined; var addr_len: posix.socklen_t = @sizeOf(PosixAddress); const fd = while (true) { try t.checkCancel(); const rc = if (have_accept4) posix.system.accept4(listen_fd, &storage.any, &addr_len, posix.SOCK.CLOEXEC) else posix.system.accept(listen_fd, &storage.any, &addr_len); switch (posix.errno(rc)) { .SUCCESS => { const fd: posix.fd_t = @intCast(rc); errdefer posix.close(fd); if (!have_accept4) while (true) { try t.checkCancel(); switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) { .SUCCESS => break, .INTR => continue, else => |err| return posix.unexpectedErrno(err), } }; break fd; }, .INTR => continue, .AGAIN => |err| return errnoBug(err), .BADF => |err| return errnoBug(err), // always a race condition .CONNABORTED => return error.ConnectionAborted, .FAULT => |err| return errnoBug(err), .INVAL => return error.SocketNotListening, .NOTSOCK => |err| return errnoBug(err), .MFILE => return error.ProcessFdQuotaExceeded, .NFILE => return error.SystemFdQuotaExceeded, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, .OPNOTSUPP => |err| return errnoBug(err), .PROTO => return error.ProtocolFailure, .PERM => return error.BlockedByFirewall, else => |err| return posix.unexpectedErrno(err), } }; return .{ .socket = .{ .handle = fd, .address = addressFromPosix(&storage), } }; } fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize { const t: *Threaded = @ptrCast(@alignCast(userdata)); var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined; var i: usize = 0; for (data) |buf| { if (iovecs_buffer.len - i == 0) break; if (buf.len != 0) { iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len }; i += 1; } } const dest = iovecs_buffer[0..i]; assert(dest[0].len > 0); if (native_os == .wasi and !builtin.link_libc) while (true) { try t.checkCancel(); var n: usize = undefined; switch (std.os.wasi.fd_read(fd, dest.ptr, dest.len, &n)) { .SUCCESS => return n, .INTR => continue, .INVAL => |err| return errnoBug(err), .FAULT => |err| return errnoBug(err), .AGAIN => |err| return errnoBug(err), .BADF => |err| return errnoBug(err), .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.Timeout, .NOTCAPABLE => return error.AccessDenied, else => |err| return posix.unexpectedErrno(err), } }; while (true) { try t.checkCancel(); const rc = posix.system.readv(fd, dest.ptr, @intCast(dest.len)); switch (posix.errno(rc)) { .SUCCESS => return @intCast(rc), .INTR => continue, .INVAL => |err| return errnoBug(err), .FAULT => |err| return errnoBug(err), .AGAIN => |err| return errnoBug(err), .BADF => |err| return errnoBug(err), // Always a race condition. .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.Timeout, .PIPE => return error.SocketUnconnected, .NETDOWN => return error.NetworkDown, else => |err| return posix.unexpectedErrno(err), } } } const have_sendmmsg = builtin.os.tag == .linux; fn netSend( userdata: ?*anyopaque, handle: net.Socket.Handle, messages: []net.OutgoingMessage, flags: net.SendFlags, ) struct { ?net.Socket.SendError, usize } { const t: *Threaded = @ptrCast(@alignCast(userdata)); const posix_flags: u32 = @as(u32, if (@hasDecl(posix.MSG, "CONFIRM") and flags.confirm) posix.MSG.CONFIRM else 0) | @as(u32, if (flags.dont_route) posix.MSG.DONTROUTE else 0) | @as(u32, if (flags.eor) posix.MSG.EOR else 0) | @as(u32, if (flags.oob) posix.MSG.OOB else 0) | @as(u32, if (@hasDecl(posix.MSG, "FASTOPEN") and flags.fastopen) posix.MSG.FASTOPEN else 0) | posix.MSG.NOSIGNAL; var i: usize = 0; while (messages.len - i != 0) { if (have_sendmmsg) { i += netSendMany(t, handle, messages[i..], posix_flags) catch |err| return .{ err, i }; continue; } netSendOne(t, handle, &messages[i], posix_flags) catch |err| return .{ err, i }; i += 1; } return .{ null, i }; } fn netSendOne( t: *Threaded, handle: net.Socket.Handle, message: *net.OutgoingMessage, flags: u32, ) net.Socket.SendError!void { var addr: PosixAddress = undefined; var iovec: posix.iovec_const = .{ .base = @constCast(message.data_ptr), .len = message.data_len }; const msg: posix.msghdr_const = .{ .name = &addr.any, .namelen = addressToPosix(message.address, &addr), .iov = (&iovec)[0..1], .iovlen = 1, .control = @constCast(message.control.ptr), .controllen = @intCast(message.control.len), .flags = 0, }; while (true) { try t.checkCancel(); const rc = posix.system.sendmsg(handle, &msg, flags); if (is_windows) { if (rc == windows.ws2_32.SOCKET_ERROR) { switch (windows.ws2_32.WSAGetLastError()) { .WSAEACCES => return error.AccessDenied, .WSAEADDRNOTAVAIL => return error.AddressNotAvailable, .WSAECONNRESET => return error.ConnectionResetByPeer, .WSAEMSGSIZE => return error.MessageOversize, .WSAENOBUFS => return error.SystemResources, .WSAENOTSOCK => return error.FileDescriptorNotASocket, .WSAEAFNOSUPPORT => return error.AddressFamilyUnsupported, .WSAEDESTADDRREQ => unreachable, // A destination address is required. .WSAEFAULT => unreachable, // The lpBuffers, lpTo, lpOverlapped, lpNumberOfBytesSent, or lpCompletionRoutine parameters are not part of the user address space, or the lpTo parameter is too small. .WSAEHOSTUNREACH => return error.NetworkUnreachable, .WSAEINVAL => unreachable, .WSAENETDOWN => return error.NetworkDown, .WSAENETRESET => return error.ConnectionResetByPeer, .WSAENETUNREACH => return error.NetworkUnreachable, .WSAENOTCONN => return error.SocketUnconnected, .WSAESHUTDOWN => unreachable, // The socket has been shut down; it is not possible to WSASendTo on a socket after shutdown has been invoked with how set to SD_SEND or SD_BOTH. .WSANOTINITIALISED => unreachable, // A successful WSAStartup call must occur before using this function. else => |err| return windows.unexpectedWSAError(err), } } else { message.data_len = @intCast(rc); return; } } switch (posix.errno(rc)) { .SUCCESS => { message.data_len = @intCast(rc); return; }, .INTR => continue, .ACCES => return error.AccessDenied, .ALREADY => return error.FastOpenAlreadyInProgress, .BADF => |err| return errnoBug(err), .CONNRESET => return error.ConnectionResetByPeer, .DESTADDRREQ => |err| return errnoBug(err), .FAULT => |err| return errnoBug(err), .INVAL => |err| return errnoBug(err), .ISCONN => |err| return errnoBug(err), .MSGSIZE => return error.MessageOversize, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, .NOTSOCK => |err| return errnoBug(err), .OPNOTSUPP => |err| return errnoBug(err), .PIPE => return error.SocketUnconnected, .AFNOSUPPORT => return error.AddressFamilyUnsupported, .HOSTUNREACH => return error.HostUnreachable, .NETUNREACH => return error.NetworkUnreachable, .NOTCONN => return error.SocketUnconnected, .NETDOWN => return error.NetworkDown, else => |err| return posix.unexpectedErrno(err), } } } fn netSendMany( t: *Threaded, handle: net.Socket.Handle, messages: []net.OutgoingMessage, flags: u32, ) net.Socket.SendError!usize { var msg_buffer: [64]std.os.linux.mmsghdr = undefined; var addr_buffer: [msg_buffer.len]PosixAddress = undefined; var iovecs_buffer: [msg_buffer.len]posix.iovec = undefined; const min_len: usize = @min(messages.len, msg_buffer.len); const clamped_messages = messages[0..min_len]; const clamped_msgs = (&msg_buffer)[0..min_len]; const clamped_addrs = (&addr_buffer)[0..min_len]; const clamped_iovecs = (&iovecs_buffer)[0..min_len]; for (clamped_messages, clamped_msgs, clamped_addrs, clamped_iovecs) |*message, *msg, *addr, *iovec| { iovec.* = .{ .base = @constCast(message.data_ptr), .len = message.data_len }; msg.* = .{ .hdr = .{ .name = &addr.any, .namelen = addressToPosix(message.address, addr), .iov = iovec[0..1], .iovlen = 1, .control = @constCast(message.control.ptr), .controllen = message.control.len, .flags = 0, }, .len = undefined, // Populated by calling sendmmsg below. }; } while (true) { try t.checkCancel(); const rc = posix.system.sendmmsg(handle, clamped_msgs.ptr, @intCast(clamped_msgs.len), flags); switch (posix.errno(rc)) { .SUCCESS => { const n: usize = @intCast(rc); for (clamped_messages[0..n], clamped_msgs[0..n]) |*message, *msg| { message.data_len = msg.len; } return n; }, .AGAIN => |err| return errnoBug(err), .ALREADY => return error.FastOpenAlreadyInProgress, .BADF => |err| return errnoBug(err), // Always a race condition. .CONNRESET => return error.ConnectionResetByPeer, .DESTADDRREQ => |err| return errnoBug(err), // The socket is not connection-mode, and no peer address is set. .FAULT => |err| return errnoBug(err), // An invalid user space address was specified for an argument. .INTR => continue, .INVAL => |err| return errnoBug(err), // Invalid argument passed. .ISCONN => |err| return errnoBug(err), // connection-mode socket was connected already but a recipient was specified .MSGSIZE => return error.MessageOversize, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, .NOTSOCK => |err| return errnoBug(err), // The file descriptor sockfd does not refer to a socket. .OPNOTSUPP => |err| return errnoBug(err), // Some bit in the flags argument is inappropriate for the socket type. .PIPE => return error.SocketUnconnected, .AFNOSUPPORT => return error.AddressFamilyUnsupported, .HOSTUNREACH => return error.NetworkUnreachable, .NETUNREACH => return error.NetworkUnreachable, .NOTCONN => return error.SocketUnconnected, .NETDOWN => return error.NetworkDown, else => |err| return posix.unexpectedErrno(err), } } } fn netReceive( userdata: ?*anyopaque, handle: net.Socket.Handle, message_buffer: []net.IncomingMessage, data_buffer: []u8, flags: net.ReceiveFlags, timeout: Io.Timeout, ) struct { ?net.Socket.ReceiveTimeoutError, usize } { const t: *Threaded = @ptrCast(@alignCast(userdata)); // recvmmsg is useless, here's why: // * [timeout bug](https://bugzilla.kernel.org/show_bug.cgi?id=75371) // * it wants iovecs for each message but we have a better API: one data // buffer to handle all the messages. The better API cannot be lowered to // the split vectors though because reducing the buffer size might make // some messages unreceivable. // So the strategy instead is to use non-blocking recvmsg calls, calling // poll() with timeout if the first one returns EAGAIN. const posix_flags: u32 = @as(u32, if (flags.oob) posix.MSG.OOB else 0) | @as(u32, if (flags.peek) posix.MSG.PEEK else 0) | @as(u32, if (flags.trunc) posix.MSG.TRUNC else 0) | posix.MSG.DONTWAIT | posix.MSG.NOSIGNAL; var poll_fds: [1]posix.pollfd = .{ .{ .fd = handle, .events = posix.POLL.IN, .revents = undefined, }, }; var message_i: usize = 0; var data_i: usize = 0; const deadline = timeout.toDeadline(t.io()) catch |err| return .{ err, message_i }; recv: while (true) { t.checkCancel() catch |err| return .{ err, message_i }; if (message_buffer.len - message_i == 0) return .{ null, message_i }; const message = &message_buffer[message_i]; const remaining_data_buffer = data_buffer[data_i..]; var storage: PosixAddress = undefined; var iov: posix.iovec = .{ .base = remaining_data_buffer.ptr, .len = remaining_data_buffer.len }; var msg: posix.msghdr = .{ .name = &storage.any, .namelen = @sizeOf(PosixAddress), .iov = (&iov)[0..1], .iovlen = 1, .control = message.control.ptr, .controllen = @intCast(message.control.len), .flags = undefined, }; const recv_rc = posix.system.recvmsg(handle, &msg, posix_flags); switch (posix.errno(recv_rc)) { .SUCCESS => { const data = remaining_data_buffer[0..@intCast(recv_rc)]; data_i += data.len; message.* = .{ .from = addressFromPosix(&storage), .data = data, .control = if (msg.control) |ptr| @as([*]u8, @ptrCast(ptr))[0..msg.controllen] else message.control, .flags = .{ .eor = (msg.flags & posix.MSG.EOR) != 0, .trunc = (msg.flags & posix.MSG.TRUNC) != 0, .ctrunc = (msg.flags & posix.MSG.CTRUNC) != 0, .oob = (msg.flags & posix.MSG.OOB) != 0, .errqueue = if (@hasDecl(posix.MSG, "ERRQUEUE")) (msg.flags & posix.MSG.ERRQUEUE) != 0 else false, }, }; message_i += 1; continue; }, .AGAIN => while (true) { t.checkCancel() catch |err| return .{ err, message_i }; if (message_i != 0) return .{ null, message_i }; const max_poll_ms = std.math.maxInt(u31); const timeout_ms: u31 = if (deadline) |d| t: { const duration = d.durationFromNow(t.io()) catch |err| return .{ err, message_i }; if (duration.raw.nanoseconds <= 0) return .{ error.Timeout, message_i }; break :t @intCast(@min(max_poll_ms, duration.raw.toMilliseconds())); } else max_poll_ms; const poll_rc = posix.system.poll(&poll_fds, poll_fds.len, timeout_ms); switch (posix.errno(poll_rc)) { .SUCCESS => { if (poll_rc == 0) { // Although spurious timeouts are OK, when no deadline // is passed we must not return `error.Timeout`. if (deadline == null) continue; return .{ error.Timeout, message_i }; } continue :recv; }, .INTR => continue, .FAULT => |err| return .{ errnoBug(err), message_i }, .INVAL => |err| return .{ errnoBug(err), message_i }, .NOMEM => return .{ error.SystemResources, message_i }, else => |err| return .{ posix.unexpectedErrno(err), message_i }, } }, .INTR => continue, .BADF => |err| return .{ errnoBug(err), message_i }, .NFILE => return .{ error.SystemFdQuotaExceeded, message_i }, .MFILE => return .{ error.ProcessFdQuotaExceeded, message_i }, .FAULT => |err| return .{ errnoBug(err), message_i }, .INVAL => |err| return .{ errnoBug(err), message_i }, .NOBUFS => return .{ error.SystemResources, message_i }, .NOMEM => return .{ error.SystemResources, message_i }, .NOTCONN => return .{ error.SocketUnconnected, message_i }, .NOTSOCK => |err| return .{ errnoBug(err), message_i }, .MSGSIZE => return .{ error.MessageOversize, message_i }, .PIPE => return .{ error.SocketUnconnected, message_i }, .OPNOTSUPP => |err| return .{ errnoBug(err), message_i }, .CONNRESET => return .{ error.ConnectionResetByPeer, message_i }, .NETDOWN => return .{ error.NetworkDown, message_i }, else => |err| return .{ posix.unexpectedErrno(err), message_i }, } } } fn netWritePosix( userdata: ?*anyopaque, fd: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize, ) net.Stream.Writer.Error!usize { const t: *Threaded = @ptrCast(@alignCast(userdata)); try t.checkCancel(); var iovecs: [max_iovecs_len]posix.iovec_const = undefined; var msg: posix.msghdr_const = .{ .name = null, .namelen = 0, .iov = &iovecs, .iovlen = 0, .control = null, .controllen = 0, .flags = 0, }; addBuf(&iovecs, &msg.iovlen, header); for (data[0 .. data.len - 1]) |bytes| addBuf(&iovecs, &msg.iovlen, bytes); const pattern = data[data.len - 1]; if (iovecs.len - msg.iovlen != 0) switch (splat) { 0 => {}, 1 => addBuf(&iovecs, &msg.iovlen, pattern), else => switch (pattern.len) { 0 => {}, 1 => { var backup_buffer: [splat_buffer_size]u8 = undefined; const splat_buffer = &backup_buffer; const memset_len = @min(splat_buffer.len, splat); const buf = splat_buffer[0..memset_len]; @memset(buf, pattern[0]); addBuf(&iovecs, &msg.iovlen, buf); var remaining_splat = splat - buf.len; while (remaining_splat > splat_buffer.len and iovecs.len - msg.iovlen != 0) { assert(buf.len == splat_buffer.len); addBuf(&iovecs, &msg.iovlen, splat_buffer); remaining_splat -= splat_buffer.len; } addBuf(&iovecs, &msg.iovlen, splat_buffer[0..remaining_splat]); }, else => for (0..@min(splat, iovecs.len - msg.iovlen)) |_| { addBuf(&iovecs, &msg.iovlen, pattern); }, }, }; const flags = posix.MSG.NOSIGNAL; return posix.sendmsg(fd, &msg, flags); } fn addBuf(v: []posix.iovec_const, i: *@FieldType(posix.msghdr_const, "iovlen"), bytes: []const u8) void { // OS checks ptr addr before length so zero length vectors must be omitted. if (bytes.len == 0) return; if (v.len - i.* == 0) return; v[i.*] = .{ .base = bytes.ptr, .len = bytes.len }; i.* += 1; } fn netClose(userdata: ?*anyopaque, handle: net.Socket.Handle) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; switch (native_os) { .windows => windows.closesocket(handle) catch recoverableOsBugDetected(), else => posix.close(handle), } } fn netInterfaceNameResolve( userdata: ?*anyopaque, name: *const net.Interface.Name, ) net.Interface.Name.ResolveError!net.Interface { const t: *Threaded = @ptrCast(@alignCast(userdata)); if (native_os == .linux) { const sock_fd = openSocketPosix(t, posix.AF.UNIX, .{ .mode = .dgram }) catch |err| switch (err) { error.ProcessFdQuotaExceeded => return error.SystemResources, error.SystemFdQuotaExceeded => return error.SystemResources, error.AddressFamilyUnsupported => return error.Unexpected, error.ProtocolUnsupportedBySystem => return error.Unexpected, error.ProtocolUnsupportedByAddressFamily => return error.Unexpected, error.SocketModeUnsupported => return error.Unexpected, error.OptionUnsupported => return error.Unexpected, else => |e| return e, }; defer posix.close(sock_fd); var ifr: posix.ifreq = .{ .ifrn = .{ .name = @bitCast(name.bytes) }, .ifru = undefined, }; while (true) { try t.checkCancel(); switch (posix.errno(posix.system.ioctl(sock_fd, posix.SIOCGIFINDEX, @intFromPtr(&ifr)))) { .SUCCESS => return .{ .index = @bitCast(ifr.ifru.ivalue) }, .INTR => continue, .INVAL => |err| return errnoBug(err), // Bad parameters. .NOTTY => |err| return errnoBug(err), .NXIO => |err| return errnoBug(err), .BADF => |err| return errnoBug(err), // Always a race condition. .FAULT => |err| return errnoBug(err), // Bad pointer parameter. .IO => |err| return errnoBug(err), // sock_fd is not a file descriptor .NODEV => return error.InterfaceNotFound, else => |err| return posix.unexpectedErrno(err), } } } if (native_os == .windows) { try t.checkCancel(); const index = std.os.windows.ws2_32.if_nametoindex(&name.bytes); if (index == 0) return error.InterfaceNotFound; return .{ .index = index }; } if (builtin.link_libc) { try t.checkCancel(); const index = std.c.if_nametoindex(&name.bytes); if (index == 0) return error.InterfaceNotFound; return .{ .index = @bitCast(index) }; } @panic("unimplemented"); } fn netInterfaceName(userdata: ?*anyopaque, interface: net.Interface) net.Interface.NameError!net.Interface.Name { const t: *Threaded = @ptrCast(@alignCast(userdata)); try t.checkCancel(); if (native_os == .linux) { _ = interface; @panic("TODO"); } if (native_os == .windows) { @panic("TODO"); } if (builtin.link_libc) { @panic("TODO"); } @panic("unimplemented"); } fn netLookup( userdata: ?*anyopaque, host_name: HostName, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, ) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const t_io = t.io(); resolved.putOneUncancelable(t_io, .{ .end = netLookupFallible(t, host_name, resolved, options) }); } fn netLookupFallible( t: *Threaded, host_name: HostName, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, ) !void { const t_io = t.io(); const name = host_name.bytes; assert(name.len <= HostName.max_len); if (is_windows) { // TODO use GetAddrInfoExW / GetAddrInfoExCancel @compileError("TODO"); } // On Linux, glibc provides getaddrinfo_a which is capable of supporting our semantics. // However, musl's POSIX-compliant getaddrinfo is not, so we bypass it. if (builtin.target.isGnuLibC()) { // TODO use getaddrinfo_a / gai_cancel } if (native_os == .linux) { if (options.family != .ip4) { if (IpAddress.parseIp6(name, options.port)) |addr| { try resolved.putAll(t_io, &.{ .{ .address = addr }, .{ .canonical_name = copyCanon(options.canonical_name_buffer, name) }, }); return; } else |_| {} } if (options.family != .ip6) { if (IpAddress.parseIp4(name, options.port)) |addr| { try resolved.putAll(t_io, &.{ .{ .address = addr }, .{ .canonical_name = copyCanon(options.canonical_name_buffer, name) }, }); return; } else |_| {} } lookupHosts(t, host_name, resolved, options) catch |err| switch (err) { error.UnknownHostName => {}, else => |e| return e, }; // RFC 6761 Section 6.3.3 // Name resolution APIs and libraries SHOULD recognize // localhost names as special and SHOULD always return the IP // loopback address for address queries and negative responses // for all other query types. // Check for equal to "localhost(.)" or ends in ".localhost(.)" const localhost = if (name[name.len - 1] == '.') "localhost." else "localhost"; if (std.mem.endsWith(u8, name, localhost) and (name.len == localhost.len or name[name.len - localhost.len] == '.')) { var results_buffer: [3]HostName.LookupResult = undefined; var results_index: usize = 0; if (options.family != .ip4) { results_buffer[results_index] = .{ .address = .{ .ip6 = .loopback(options.port) } }; results_index += 1; } if (options.family != .ip6) { results_buffer[results_index] = .{ .address = .{ .ip4 = .loopback(options.port) } }; results_index += 1; } const canon_name = "localhost"; const canon_name_dest = options.canonical_name_buffer[0..canon_name.len]; canon_name_dest.* = canon_name.*; results_buffer[results_index] = .{ .canonical_name = .{ .bytes = canon_name_dest } }; results_index += 1; try resolved.putAll(t_io, results_buffer[0..results_index]); return; } return lookupDnsSearch(t, host_name, resolved, options); } if (native_os == .openbsd) { // TODO use getaddrinfo_async / asr_abort } if (native_os == .freebsd) { // TODO use dnsres_getaddrinfo } if (native_os.isDarwin()) { // TODO use CFHostStartInfoResolution / CFHostCancelInfoResolution } if (builtin.link_libc) { // This operating system lacks a way to resolve asynchronously. We are // stuck with getaddrinfo. var name_buffer: [HostName.max_len + 1]u8 = undefined; @memcpy(name_buffer[0..host_name.bytes.len], host_name.bytes); name_buffer[host_name.bytes.len] = 0; const name_c = name_buffer[0..host_name.bytes.len :0]; var port_buffer: [8]u8 = undefined; const port_c = std.fmt.bufPrintZ(&port_buffer, "{d}", .{options.port}) catch unreachable; const hints: posix.addrinfo = .{ .flags = .{ .NUMERICSERV = true }, .family = posix.AF.UNSPEC, .socktype = posix.SOCK.STREAM, .protocol = posix.IPPROTO.TCP, .canonname = null, .addr = null, .addrlen = 0, .next = null, }; var res: ?*posix.addrinfo = null; while (true) { try t.checkCancel(); switch (posix.system.getaddrinfo(name_c.ptr, port_c.ptr, &hints, &res)) { @as(posix.system.EAI, @enumFromInt(0)) => {}, .ADDRFAMILY => return error.AddressFamilyUnsupported, .AGAIN => return error.NameServerFailure, .FAIL => return error.NameServerFailure, .FAMILY => return error.AddressFamilyUnsupported, .MEMORY => return error.SystemResources, .NODATA => return error.UnknownHostName, .NONAME => return error.UnknownHostName, .SYSTEM => switch (posix.errno(-1)) { .INTR => continue, else => |e| return posix.unexpectedErrno(e), }, else => return error.Unexpected, } } defer if (res) |some| posix.system.freeaddrinfo(some); var it = res; var canon_name: ?[]const u8 = null; while (it) |info| : (it = info.next) { const addr = info.addr orelse continue; try resolved.putOne(addressFromPosix(addr)); if (info.canonname) |n| { if (canon_name == null) { canon_name = n; } } } if (canon_name) |n| { try resolved.putOne(.{ .canonical_name = copyCanon(options.canonical_name_buffer, n) }); } return; } return error.OptionUnsupported; } const PosixAddress = extern union { any: posix.sockaddr, in: posix.sockaddr.in, in6: posix.sockaddr.in6, }; const UnixAddress = extern union { any: posix.sockaddr, un: posix.sockaddr.un, }; fn posixAddressFamily(a: *const IpAddress) posix.sa_family_t { return switch (a.*) { .ip4 => posix.AF.INET, .ip6 => posix.AF.INET6, }; } fn addressFromPosix(posix_address: *PosixAddress) IpAddress { return switch (posix_address.any.family) { posix.AF.INET => .{ .ip4 = address4FromPosix(&posix_address.in) }, posix.AF.INET6 => .{ .ip6 = address6FromPosix(&posix_address.in6) }, else => .{ .ip4 = .loopback(0) }, }; } fn addressToPosix(a: *const IpAddress, storage: *PosixAddress) posix.socklen_t { return switch (a.*) { .ip4 => |ip4| { storage.in = address4ToPosix(ip4); return @sizeOf(posix.sockaddr.in); }, .ip6 => |*ip6| { storage.in6 = address6ToPosix(ip6); return @sizeOf(posix.sockaddr.in6); }, }; } fn addressUnixToPosix(a: *const net.UnixAddress, storage: *UnixAddress) posix.socklen_t { @memcpy(storage.un.path[0..a.path.len], a.path); storage.un.family = posix.AF.UNIX; storage.un.path[a.path.len] = 0; return @sizeOf(posix.sockaddr.un); } fn address4FromPosix(in: *posix.sockaddr.in) net.Ip4Address { return .{ .port = std.mem.bigToNative(u16, in.port), .bytes = @bitCast(in.addr), }; } fn address6FromPosix(in6: *posix.sockaddr.in6) net.Ip6Address { return .{ .port = std.mem.bigToNative(u16, in6.port), .bytes = in6.addr, .flow = in6.flowinfo, .interface = .{ .index = in6.scope_id }, }; } fn address4ToPosix(a: net.Ip4Address) posix.sockaddr.in { return .{ .port = std.mem.nativeToBig(u16, a.port), .addr = @bitCast(a.bytes), }; } fn address6ToPosix(a: *const net.Ip6Address) posix.sockaddr.in6 { return .{ .port = std.mem.nativeToBig(u16, a.port), .flowinfo = a.flow, .addr = a.bytes, .scope_id = a.interface.index, }; } fn errnoBug(err: posix.E) Io.UnexpectedError { switch (builtin.mode) { .Debug => std.debug.panic("programmer bug caused syscall error: {t}", .{err}), else => return error.Unexpected, } } fn posixSocketMode(mode: net.Socket.Mode) u32 { return switch (mode) { .stream => posix.SOCK.STREAM, .dgram => posix.SOCK.DGRAM, .seqpacket => posix.SOCK.SEQPACKET, .raw => posix.SOCK.RAW, .rdm => posix.SOCK.RDM, }; } fn posixProtocol(protocol: ?net.Protocol) u32 { return @intFromEnum(protocol orelse return 0); } fn recoverableOsBugDetected() void { if (builtin.mode == .Debug) unreachable; } fn clockToPosix(clock: Io.Clock) posix.clockid_t { return switch (clock) { .real => posix.CLOCK.REALTIME, .awake => switch (builtin.os.tag) { .macos, .ios, .watchos, .tvos => posix.CLOCK.UPTIME_RAW, else => posix.CLOCK.MONOTONIC, }, .boot => switch (builtin.os.tag) { .macos, .ios, .watchos, .tvos => posix.CLOCK.MONOTONIC_RAW, else => posix.CLOCK.BOOTTIME, }, .cpu_process => posix.CLOCK.PROCESS_CPUTIME_ID, .cpu_thread => posix.CLOCK.THREAD_CPUTIME_ID, }; } fn clockToWasi(clock: Io.Clock) std.os.wasi.clockid_t { return switch (clock) { .realtime => .REALTIME, .awake => .MONOTONIC, .boot => .MONOTONIC, .cpu_process => .PROCESS_CPUTIME_ID, .cpu_thread => .THREAD_CPUTIME_ID, }; } fn statFromLinux(stx: *const std.os.linux.Statx) Io.File.Stat { const atime = stx.atime; const mtime = stx.mtime; const ctime = stx.ctime; return .{ .inode = stx.ino, .size = stx.size, .mode = stx.mode, .kind = switch (stx.mode & std.os.linux.S.IFMT) { std.os.linux.S.IFDIR => .directory, std.os.linux.S.IFCHR => .character_device, std.os.linux.S.IFBLK => .block_device, std.os.linux.S.IFREG => .file, std.os.linux.S.IFIFO => .named_pipe, std.os.linux.S.IFLNK => .sym_link, std.os.linux.S.IFSOCK => .unix_domain_socket, else => .unknown, }, .atime = .{ .nanoseconds = @intCast(@as(i128, atime.sec) * std.time.ns_per_s + atime.nsec) }, .mtime = .{ .nanoseconds = @intCast(@as(i128, mtime.sec) * std.time.ns_per_s + mtime.nsec) }, .ctime = .{ .nanoseconds = @intCast(@as(i128, ctime.sec) * std.time.ns_per_s + ctime.nsec) }, }; } fn statFromPosix(st: *const std.posix.Stat) Io.File.Stat { const atime = st.atime(); const mtime = st.mtime(); const ctime = st.ctime(); return .{ .inode = st.ino, .size = @bitCast(st.size), .mode = st.mode, .kind = k: { const m = st.mode & std.posix.S.IFMT; switch (m) { std.posix.S.IFBLK => break :k .block_device, std.posix.S.IFCHR => break :k .character_device, std.posix.S.IFDIR => break :k .directory, std.posix.S.IFIFO => break :k .named_pipe, std.posix.S.IFLNK => break :k .sym_link, std.posix.S.IFREG => break :k .file, std.posix.S.IFSOCK => break :k .unix_domain_socket, else => {}, } if (builtin.os.tag == .illumos) switch (m) { std.posix.S.IFDOOR => break :k .door, std.posix.S.IFPORT => break :k .event_port, else => {}, }; break :k .unknown; }, .atime = timestampFromPosix(&atime), .mtime = timestampFromPosix(&mtime), .ctime = timestampFromPosix(&ctime), }; } fn statFromWasi(st: *const std.os.wasi.filestat_t) Io.File.Stat { return .{ .inode = st.ino, .size = @bitCast(st.size), .mode = 0, .kind = switch (st.filetype) { .BLOCK_DEVICE => .block_device, .CHARACTER_DEVICE => .character_device, .DIRECTORY => .directory, .SYMBOLIC_LINK => .sym_link, .REGULAR_FILE => .file, .SOCKET_STREAM, .SOCKET_DGRAM => .unix_domain_socket, else => .unknown, }, .atime = st.atim, .mtime = st.mtim, .ctime = st.ctim, }; } fn timestampFromPosix(timespec: *const std.posix.timespec) Io.Timestamp { return .{ .nanoseconds = @intCast(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec) }; } fn timestampToPosix(nanoseconds: i96) std.posix.timespec { return .{ .sec = @intCast(@divFloor(nanoseconds, std.time.ns_per_s)), .nsec = @intCast(@mod(nanoseconds, std.time.ns_per_s)), }; } fn pathToPosix(file_path: []const u8, buffer: *[posix.PATH_MAX]u8) Io.Dir.PathNameError![:0]u8 { if (std.mem.containsAtLeastScalar2(u8, file_path, 0, 1)) return error.BadPathName; // >= rather than > to make room for the null byte if (file_path.len >= buffer.len) return error.NameTooLong; @memcpy(buffer[0..file_path.len], file_path); buffer[file_path.len] = 0; return buffer[0..file_path.len :0]; } fn lookupDnsSearch( t: *Threaded, host_name: HostName, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, ) HostName.LookupError!void { const t_io = t.io(); const rc = HostName.ResolvConf.init(t_io) catch return error.ResolvConfParseFailed; // Count dots, suppress search when >=ndots or name ends in // a dot, which is an explicit request for global scope. const dots = std.mem.countScalar(u8, host_name.bytes, '.'); const search_len = if (dots >= rc.ndots or std.mem.endsWith(u8, host_name.bytes, ".")) 0 else rc.search_len; const search = rc.search_buffer[0..search_len]; var canon_name = host_name.bytes; // Strip final dot for canon, fail if multiple trailing dots. if (std.mem.endsWith(u8, canon_name, ".")) canon_name.len -= 1; if (std.mem.endsWith(u8, canon_name, ".")) return error.UnknownHostName; // Name with search domain appended is set up in `canon_name`. This // both provides the desired default canonical name (if the requested // name is not a CNAME record) and serves as a buffer for passing the // full requested name to `lookupDns`. @memcpy(options.canonical_name_buffer[0..canon_name.len], canon_name); options.canonical_name_buffer[canon_name.len] = '.'; var it = std.mem.tokenizeAny(u8, search, " \t"); while (it.next()) |token| { @memcpy(options.canonical_name_buffer[canon_name.len + 1 ..][0..token.len], token); const lookup_canon_name = options.canonical_name_buffer[0 .. canon_name.len + 1 + token.len]; if (lookupDns(t, lookup_canon_name, &rc, resolved, options)) |result| { return result; } else |err| switch (err) { error.UnknownHostName => continue, else => |e| return e, } } const lookup_canon_name = options.canonical_name_buffer[0..canon_name.len]; return lookupDns(t, lookup_canon_name, &rc, resolved, options); } fn lookupDns( t: *Threaded, lookup_canon_name: []const u8, rc: *const HostName.ResolvConf, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, ) HostName.LookupError!void { const t_io = t.io(); const family_records: [2]struct { af: IpAddress.Family, rr: u8 } = .{ .{ .af = .ip6, .rr = std.posix.RR.A }, .{ .af = .ip4, .rr = std.posix.RR.AAAA }, }; var query_buffers: [2][280]u8 = undefined; var answer_buffer: [2 * 512]u8 = undefined; var queries_buffer: [2][]const u8 = undefined; var answers_buffer: [2][]const u8 = undefined; var nq: usize = 0; var answer_buffer_i: usize = 0; for (family_records) |fr| { if (options.family != fr.af) { const entropy = std.crypto.random.array(u8, 2); const len = writeResolutionQuery(&query_buffers[nq], 0, lookup_canon_name, 1, fr.rr, entropy); queries_buffer[nq] = query_buffers[nq][0..len]; nq += 1; } } var ip4_mapped_buffer: [HostName.ResolvConf.max_nameservers]IpAddress = undefined; const ip4_mapped = ip4_mapped_buffer[0..rc.nameservers_len]; var any_ip6 = false; for (rc.nameservers(), ip4_mapped) |*ns, *m| { m.* = .{ .ip6 = .fromAny(ns.*) }; any_ip6 = any_ip6 or ns.* == .ip6; } var socket = s: { if (any_ip6) ip6: { const ip6_addr: IpAddress = .{ .ip6 = .unspecified(0) }; const socket = ip6_addr.bind(t_io, .{ .ip6_only = true, .mode = .dgram }) catch |err| switch (err) { error.AddressFamilyUnsupported => break :ip6, else => |e| return e, }; break :s socket; } any_ip6 = false; const ip4_addr: IpAddress = .{ .ip4 = .unspecified(0) }; const socket = try ip4_addr.bind(t_io, .{ .mode = .dgram }); break :s socket; }; defer socket.close(t_io); const mapped_nameservers = if (any_ip6) ip4_mapped else rc.nameservers(); const queries = queries_buffer[0..nq]; const answers = answers_buffer[0..queries.len]; var answers_remaining = answers.len; for (answers) |*answer| answer.len = 0; // boot clock is chosen because time the computer is suspended should count // against time spent waiting for external messages to arrive. const clock: Io.Clock = .boot; var now_ts = try clock.now(t_io); const final_ts = now_ts.addDuration(.fromSeconds(rc.timeout_seconds)); const attempt_duration: Io.Duration = .{ .nanoseconds = std.time.ns_per_s * @as(usize, rc.timeout_seconds) / rc.attempts, }; send: while (now_ts.nanoseconds < final_ts.nanoseconds) : (now_ts = try clock.now(t_io)) { const max_messages = queries_buffer.len * HostName.ResolvConf.max_nameservers; { var message_buffer: [max_messages]Io.net.OutgoingMessage = undefined; var message_i: usize = 0; for (queries, answers) |query, *answer| { if (answer.len != 0) continue; for (mapped_nameservers) |*ns| { message_buffer[message_i] = .{ .address = ns, .data_ptr = query.ptr, .data_len = query.len, }; message_i += 1; } } _ = netSend(t, socket.handle, message_buffer[0..message_i], .{}); } const timeout: Io.Timeout = .{ .deadline = .{ .raw = now_ts.addDuration(attempt_duration), .clock = clock, } }; while (true) { var message_buffer: [max_messages]Io.net.IncomingMessage = undefined; const buf = answer_buffer[answer_buffer_i..]; const recv_err, const recv_n = socket.receiveManyTimeout(t_io, &message_buffer, buf, .{}, timeout); for (message_buffer[0..recv_n]) |*received_message| { const reply = received_message.data; // Ignore non-identifiable packets. if (reply.len < 4) continue; // Ignore replies from addresses we didn't send to. const ns = for (mapped_nameservers) |*ns| { if (received_message.from.eql(ns)) break ns; } else { continue; }; // Find which query this answer goes with, if any. const query, const answer = for (queries, answers) |query, *answer| { if (reply[0] == query[0] and reply[1] == query[1]) break .{ query, answer }; } else { continue; }; if (answer.len != 0) continue; // Only accept positive or negative responses; retry immediately on // server failure, and ignore all other codes such as refusal. switch (reply[3] & 15) { 0, 3 => { answer.* = reply; answer_buffer_i += reply.len; answers_remaining -= 1; if (answer_buffer.len - answer_buffer_i == 0) break :send; if (answers_remaining == 0) break :send; }, 2 => { var retry_message: Io.net.OutgoingMessage = .{ .address = ns, .data_ptr = query.ptr, .data_len = query.len, }; _ = netSend(t, socket.handle, (&retry_message)[0..1], .{}); continue; }, else => continue, } } if (recv_err) |err| switch (err) { error.Canceled => return error.Canceled, error.Timeout => continue :send, else => continue, }; } } else { return error.NameServerFailure; } var addresses_len: usize = 0; var canonical_name: ?HostName = null; for (answers) |answer| { var it = HostName.DnsResponse.init(answer) catch { // Here we could potentially add diagnostics to the results queue. continue; }; while (it.next() catch { // Here we could potentially add diagnostics to the results queue. continue; }) |record| switch (record.rr) { std.posix.RR.A => { const data = record.packet[record.data_off..][0..record.data_len]; if (data.len != 4) return error.InvalidDnsARecord; try resolved.putOne(t_io, .{ .address = .{ .ip4 = .{ .bytes = data[0..4].*, .port = options.port, } } }); addresses_len += 1; }, std.posix.RR.AAAA => { const data = record.packet[record.data_off..][0..record.data_len]; if (data.len != 16) return error.InvalidDnsAAAARecord; try resolved.putOne(t_io, .{ .address = .{ .ip6 = .{ .bytes = data[0..16].*, .port = options.port, } } }); addresses_len += 1; }, std.posix.RR.CNAME => { _, canonical_name = HostName.expand(record.packet, record.data_off, options.canonical_name_buffer) catch return error.InvalidDnsCnameRecord; }, else => continue, }; } try resolved.putOne(t_io, .{ .canonical_name = canonical_name orelse .{ .bytes = lookup_canon_name } }); if (addresses_len == 0) return error.NameServerFailure; } fn lookupHosts( t: *Threaded, host_name: HostName, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, ) !void { const t_io = t.io(); const file = Io.File.openAbsolute(t_io, "/etc/hosts", .{}) catch |err| switch (err) { error.FileNotFound, error.NotDir, error.AccessDenied, => return error.UnknownHostName, error.Canceled => |e| return e, else => { // Here we could add more detailed diagnostics to the results queue. return error.DetectingNetworkConfigurationFailed; }, }; defer file.close(t_io); var line_buf: [512]u8 = undefined; var file_reader = file.reader(t_io, &line_buf); return lookupHostsReader(t, host_name, resolved, options, &file_reader.interface) catch |err| switch (err) { error.ReadFailed => switch (file_reader.err.?) { error.Canceled => |e| return e, else => { // Here we could add more detailed diagnostics to the results queue. return error.DetectingNetworkConfigurationFailed; }, }, error.Canceled => |e| return e, error.UnknownHostName => |e| return e, }; } fn lookupHostsReader( t: *Threaded, host_name: HostName, resolved: *Io.Queue(HostName.LookupResult), options: HostName.LookupOptions, reader: *Io.Reader, ) error{ ReadFailed, Canceled, UnknownHostName }!void { const t_io = t.io(); var addresses_len: usize = 0; var canonical_name: ?HostName = null; while (true) { const line = reader.takeDelimiterExclusive('\n') catch |err| switch (err) { error.StreamTooLong => { // Skip lines that are too long. _ = reader.discardDelimiterInclusive('\n') catch |e| switch (e) { error.EndOfStream => break, error.ReadFailed => return error.ReadFailed, }; continue; }, error.ReadFailed => return error.ReadFailed, error.EndOfStream => break, }; reader.toss(1); var split_it = std.mem.splitScalar(u8, line, '#'); const no_comment_line = split_it.first(); var line_it = std.mem.tokenizeAny(u8, no_comment_line, " \t"); const ip_text = line_it.next() orelse continue; var first_name_text: ?[]const u8 = null; while (line_it.next()) |name_text| { if (std.mem.eql(u8, name_text, host_name.bytes)) { if (first_name_text == null) first_name_text = name_text; break; } } else continue; if (canonical_name == null) { if (HostName.init(first_name_text.?)) |name_text| { if (name_text.bytes.len <= options.canonical_name_buffer.len) { const canonical_name_dest = options.canonical_name_buffer[0..name_text.bytes.len]; @memcpy(canonical_name_dest, name_text.bytes); canonical_name = .{ .bytes = canonical_name_dest }; } } else |_| {} } if (options.family != .ip6) { if (IpAddress.parseIp4(ip_text, options.port)) |addr| { try resolved.putOne(t_io, .{ .address = addr }); addresses_len += 1; } else |_| {} } if (options.family != .ip4) { if (IpAddress.parseIp6(ip_text, options.port)) |addr| { try resolved.putOne(t_io, .{ .address = addr }); addresses_len += 1; } else |_| {} } } if (canonical_name) |canon_name| try resolved.putOne(t_io, .{ .canonical_name = canon_name }); if (addresses_len == 0) return error.UnknownHostName; } /// Writes DNS resolution query packet data to `w`; at most 280 bytes. fn writeResolutionQuery(q: *[280]u8, op: u4, dname: []const u8, class: u8, ty: u8, entropy: [2]u8) usize { // This implementation is ported from musl libc. // A more idiomatic "ziggy" implementation would be welcome. var name = dname; if (std.mem.endsWith(u8, name, ".")) name.len -= 1; assert(name.len <= 253); const n = 17 + name.len + @intFromBool(name.len != 0); // Construct query template - ID will be filled later q[0..2].* = entropy; @memset(q[2..n], 0); q[2] = @as(u8, op) * 8 + 1; q[5] = 1; @memcpy(q[13..][0..name.len], name); var i: usize = 13; var j: usize = undefined; while (q[i] != 0) : (i = j + 1) { j = i; while (q[j] != 0 and q[j] != '.') : (j += 1) {} // TODO determine the circumstances for this and whether or // not this should be an error. if (j - i - 1 > 62) unreachable; q[i - 1] = @intCast(j - i); } q[i + 1] = ty; q[i + 3] = class; return n; } fn copyCanon(canonical_name_buffer: *[HostName.max_len]u8, name: []const u8) HostName { const dest = canonical_name_buffer[0..name.len]; @memcpy(dest, name); return .{ .bytes = dest }; } /// Darwin XNU 7195.50.7.100.1 introduced __ulock_wait2 and migrated code paths (notably pthread_cond_t) towards it: /// https://github.com/apple/darwin-xnu/commit/d4061fb0260b3ed486147341b72468f836ed6c8f#diff-08f993cc40af475663274687b7c326cc6c3031e0db3ac8de7b24624610616be6 /// /// This XNU version appears to correspond to 11.0.1: /// https://kernelshaman.blogspot.com/2021/01/building-xnu-for-macos-big-sur-1101.html /// /// ulock_wait() uses 32-bit micro-second timeouts where 0 = INFINITE or no-timeout /// ulock_wait2() uses 64-bit nano-second timeouts (with the same convention) const darwin_supports_ulock_wait2 = builtin.os.version_range.semver.min.major >= 11; fn futexWait(t: *Threaded, ptr: *const std.atomic.Value(u32), expect: u32) Io.Cancelable!void { @branchHint(.cold); if (native_os == .linux) { const linux = std.os.linux; try t.checkCancel(); const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, null); if (builtin.mode == .Debug) switch (linux.E.init(rc)) { .SUCCESS => {}, // notified by `wake()` .INTR => {}, // gives caller a chance to check cancellation .AGAIN => {}, // ptr.* != expect .INVAL => {}, // possibly timeout overflow .TIMEDOUT => unreachable, .FAULT => unreachable, // ptr was invalid else => unreachable, }; return; } if (native_os.isDarwin()) { const c = std.c; const flags: c.UL = .{ .op = .COMPARE_AND_WAIT, .NO_ERRNO = true, }; try t.checkCancel(); const status = if (darwin_supports_ulock_wait2) c.__ulock_wait2(flags, ptr, expect, 0, 0) else c.__ulock_wait(flags, ptr, expect, 0); if (status >= 0) return; if (builtin.mode == .Debug) switch (@as(c.E, @enumFromInt(-status))) { // Wait was interrupted by the OS or other spurious signalling. .INTR => {}, // Address of the futex was paged out. This is unlikely, but possible in theory, and // pthread/libdispatch on darwin bother to handle it. In this case we'll return // without waiting, but the caller should retry anyway. .FAULT => {}, .TIMEDOUT => unreachable, else => unreachable, }; return; } @compileError("TODO"); } pub fn futexWaitUncancelable(ptr: *const std.atomic.Value(u32), expect: u32) void { @branchHint(.cold); if (native_os == .linux) { const linux = std.os.linux; const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, null); if (builtin.mode == .Debug) switch (linux.E.init(rc)) { .SUCCESS => {}, // notified by `wake()` .INTR => {}, // gives caller a chance to check cancellation .AGAIN => {}, // ptr.* != expect .INVAL => {}, // possibly timeout overflow .TIMEDOUT => unreachable, .FAULT => unreachable, // ptr was invalid else => unreachable, }; return; } if (native_os.isDarwin()) { const c = std.c; const flags: c.UL = .{ .op = .COMPARE_AND_WAIT, .NO_ERRNO = true, }; const status = if (darwin_supports_ulock_wait2) c.__ulock_wait2(flags, ptr, expect, 0, 0) else c.__ulock_wait(flags, ptr, expect, 0); if (status >= 0) return; if (builtin.mode == .Debug) switch (@as(c.E, @enumFromInt(-status))) { // Wait was interrupted by the OS or other spurious signalling. .INTR => {}, // Address of the futex was paged out. This is unlikely, but possible in theory, and // pthread/libdispatch on darwin bother to handle it. In this case we'll return // without waiting, but the caller should retry anyway. .FAULT => {}, .TIMEDOUT => unreachable, else => unreachable, }; return; } @compileError("TODO"); } pub fn futexWaitDurationUncancelable(ptr: *const std.atomic.Value(u32), expect: u32, timeout: Io.Duration) void { @branchHint(.cold); if (native_os == .linux) { const linux = std.os.linux; var ts = timestampToPosix(timeout.toNanoseconds()); const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, &ts); if (builtin.mode == .Debug) switch (linux.E.init(rc)) { .SUCCESS => {}, // notified by `wake()` .INTR => {}, // gives caller a chance to check cancellation .AGAIN => {}, // ptr.* != expect .TIMEDOUT => {}, .INVAL => {}, // possibly timeout overflow .FAULT => unreachable, // ptr was invalid else => unreachable, }; return; } @compileError("TODO"); } pub fn futexWake(ptr: *const std.atomic.Value(u32), max_waiters: u32) void { @branchHint(.cold); if (native_os == .linux) { const linux = std.os.linux; const rc = linux.futex_3arg( &ptr.raw, .{ .cmd = .WAKE, .private = true }, @min(max_waiters, std.math.maxInt(i32)), ); if (builtin.mode == .Debug) switch (linux.E.init(rc)) { .SUCCESS => {}, // successful wake up .INVAL => {}, // invalid futex_wait() on ptr done elsewhere .FAULT => {}, // pointer became invalid while doing the wake else => unreachable, }; return; } if (native_os.isDarwin()) { const c = std.c; const flags: c.UL = .{ .op = .COMPARE_AND_WAIT, .NO_ERRNO = true, .WAKE_ALL = max_waiters > 1, }; const is_debug = builtin.mode == .Debug; while (true) { const status = c.__ulock_wake(flags, ptr, 0); if (status >= 0) return; switch (@as(c.E, @enumFromInt(-status))) { .INTR => continue, // spurious wake() .FAULT => assert(!is_debug), // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t .NOENT => return, // nothing was woken up .ALREADY => assert(!is_debug), // only for UL.Op.WAKE_THREAD else => assert(!is_debug), } } } @compileError("TODO"); } /// A thread-safe logical boolean value which can be `set` and `unset`. /// /// It can also block threads until the value is set with cancelation via timed /// waits. Statically initializable; four bytes on all targets. pub const ResetEvent = enum(u32) { unset = 0, waiting = 1, is_set = 2, /// Returns whether the logical boolean is `set`. /// /// Once `reset` is called, this returns false until the next `set`. /// /// The memory accesses before the `set` can be said to happen before /// `isSet` returns true. pub fn isSet(re: *const ResetEvent) bool { if (builtin.single_threaded) return switch (re.*) { .unset => false, .waiting => unreachable, .is_set => true, }; // Acquire barrier ensures memory accesses before `set` happen before // returning true. return @atomicLoad(ResetEvent, re, .acquire) == .is_set; } /// Blocks the calling thread until `set` is called. /// /// This is effectively a more efficient version of `while (!isSet()) {}`. /// /// The memory accesses before the `set` can be said to happen before `wait` returns. pub fn wait(re: *ResetEvent, t: *Threaded) Io.Cancelable!void { if (builtin.single_threaded) switch (re.*) { .unset => unreachable, // Deadlock, no other threads to wake us up. .waiting => unreachable, // Invalid state. .is_set => return, }; if (re.isSet()) { @branchHint(.likely); return; } // Try to set the state from `unset` to `waiting` to indicate to the // `set` thread that others are blocked on the ResetEvent. Avoid using // any strict barriers until we know the ResetEvent is set. var state = @atomicLoad(ResetEvent, re, .acquire); if (state == .unset) { state = @cmpxchgStrong(ResetEvent, re, state, .waiting, .acquire, .acquire) orelse .waiting; } while (state == .waiting) { try futexWait(t, @ptrCast(re), @intFromEnum(ResetEvent.waiting)); state = @atomicLoad(ResetEvent, re, .acquire); } assert(state == .is_set); } /// Same as `wait` except uninterruptible. pub fn waitUncancelable(re: *ResetEvent) void { if (builtin.single_threaded) switch (re.*) { .unset => unreachable, // Deadlock, no other threads to wake us up. .waiting => unreachable, // Invalid state. .is_set => return, }; if (re.isSet()) { @branchHint(.likely); return; } // Try to set the state from `unset` to `waiting` to indicate to the // `set` thread that others are blocked on the ResetEvent. Avoid using // any strict barriers until we know the ResetEvent is set. var state = @atomicLoad(ResetEvent, re, .acquire); if (state == .unset) { state = @cmpxchgStrong(ResetEvent, re, state, .waiting, .acquire, .acquire) orelse .waiting; } while (state == .waiting) { futexWaitUncancelable(@ptrCast(re), @intFromEnum(ResetEvent.waiting)); state = @atomicLoad(ResetEvent, re, .acquire); } assert(state == .is_set); } /// Marks the logical boolean as `set` and unblocks any threads in `wait` /// or `timedWait` to observe the new state. /// /// The logical boolean stays `set` until `reset` is called, making future /// `set` calls do nothing semantically. /// /// The memory accesses before `set` can be said to happen before `isSet` /// returns true or `wait`/`timedWait` return successfully. pub fn set(re: *ResetEvent) void { if (builtin.single_threaded) { re.* = .is_set; return; } if (@atomicRmw(ResetEvent, re, .Xchg, .is_set, .release) == .waiting) { futexWake(@ptrCast(re), std.math.maxInt(u32)); } } /// Unmarks the ResetEvent as if `set` was never called. /// /// Assumes no threads are blocked in `wait` or `timedWait`. Concurrent /// calls to `set`, `isSet` and `reset` are allowed. pub fn reset(re: *ResetEvent) void { if (builtin.single_threaded) { re.* = .unset; return; } @atomicStore(ResetEvent, re, .unset, .monotonic); } };