zig/lib/std/Io/Threaded.zig
Andrew Kelley 35ce907c06 std.Io.net.HostName: move lookup to the interface
Unfortunately this can't be implemented "above the vtable" because
various operating systems don't provide low level DNS resolution
primitives such as just putting the list of nameservers in a file.

Without libc on Linux it works great though!

Anyway this also changes the API to be based on Io.Queue. By using a
large enough buffer, reusable code can be written that does not require
concurrent, yet takes advantage of responding to DNS queries as they
come in. I sketched out a new implementation of `HostName.connect` to
demonstrate this, but it will require an additional API (`Io.Select`) to
be implemented in a future commit.

This commit also introduces "uncancelable" variants for mutex locking,
waiting on a condition, and putting items into a queue.
2025-10-29 06:20:49 -07:00

3385 lines
128 KiB
Zig

const Pool = @This();
const builtin = @import("builtin");
const native_os = builtin.os.tag;
const is_windows = native_os == .windows;
const windows = std.os.windows;
const std = @import("../std.zig");
const Io = std.Io;
const net = std.Io.net;
const HostName = std.Io.net.HostName;
const IpAddress = std.Io.net.IpAddress;
const Allocator = std.mem.Allocator;
const assert = std.debug.assert;
const posix = std.posix;
const ResetEvent = std.Thread.ResetEvent;
/// Thread-safe.
allocator: Allocator,
mutex: std.Thread.Mutex = .{},
cond: std.Thread.Condition = .{},
run_queue: std.SinglyLinkedList = .{},
join_requested: bool = false,
threads: std.ArrayListUnmanaged(std.Thread),
stack_size: usize,
cpu_count: std.Thread.CpuCountError!usize,
concurrent_count: usize,
threadlocal var current_closure: ?*Closure = null;
const max_iovecs_len = 8;
const splat_buffer_size = 64;
comptime {
assert(max_iovecs_len <= posix.IOV_MAX);
}
const Closure = struct {
start: Start,
node: std.SinglyLinkedList.Node = .{},
cancel_tid: std.Thread.Id,
/// Whether this task bumps minimum number of threads in the pool.
is_concurrent: bool,
const Start = *const fn (*Closure) void;
const canceling_tid: std.Thread.Id = switch (@typeInfo(std.Thread.Id)) {
.int => |int_info| switch (int_info.signedness) {
.signed => -1,
.unsigned => std.math.maxInt(std.Thread.Id),
},
.pointer => @ptrFromInt(std.math.maxInt(usize)),
else => @compileError("unsupported std.Thread.Id: " ++ @typeName(std.Thread.Id)),
};
fn requestCancel(closure: *Closure) void {
switch (@atomicRmw(std.Thread.Id, &closure.cancel_tid, .Xchg, canceling_tid, .acq_rel)) {
0, canceling_tid => {},
else => |tid| switch (builtin.os.tag) {
.linux => _ = std.os.linux.tgkill(std.os.linux.getpid(), @bitCast(tid), posix.SIG.IO),
else => {},
},
}
}
};
pub const InitError = std.Thread.CpuCountError || Allocator.Error;
/// Related:
/// * `init_single_threaded`
pub fn init(
/// Must be threadsafe. Only used for the following functions:
/// * `Io.VTable.async`
/// * `Io.VTable.concurrent`
/// * `Io.VTable.groupAsync`
/// If these functions are avoided, then `Allocator.failing` may be passed
/// here.
gpa: Allocator,
) Pool {
var pool: Pool = .{
.allocator = gpa,
.threads = .empty,
.stack_size = std.Thread.SpawnConfig.default_stack_size,
.cpu_count = std.Thread.getCpuCount(),
.concurrent_count = 0,
};
if (pool.cpu_count) |n| {
pool.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {};
} else |_| {}
return pool;
}
/// Statically initialize such that any call to the following functions will
/// fail with `error.OutOfMemory`:
/// * `Io.VTable.async`
/// * `Io.VTable.concurrent`
/// * `Io.VTable.groupAsync`
/// When initialized this way, `deinit` is safe, but unnecessary to call.
pub const init_single_threaded: Pool = .{
.allocator = .failing,
.threads = .empty,
.stack_size = std.Thread.SpawnConfig.default_stack_size,
.cpu_count = 1,
.concurrent_count = 0,
};
pub fn deinit(pool: *Pool) void {
const gpa = pool.allocator;
pool.join();
pool.threads.deinit(gpa);
pool.* = undefined;
}
fn join(pool: *Pool) void {
if (builtin.single_threaded) return;
{
pool.mutex.lock();
defer pool.mutex.unlock();
pool.join_requested = true;
}
pool.cond.broadcast();
for (pool.threads.items) |thread| thread.join();
}
fn worker(pool: *Pool) void {
pool.mutex.lock();
defer pool.mutex.unlock();
while (true) {
while (pool.run_queue.popFirst()) |closure_node| {
pool.mutex.unlock();
const closure: *Closure = @fieldParentPtr("node", closure_node);
const is_concurrent = closure.is_concurrent;
closure.start(closure);
pool.mutex.lock();
if (is_concurrent) {
// TODO also pop thread and join sometimes
pool.concurrent_count -= 1;
}
}
if (pool.join_requested) break;
pool.cond.wait(&pool.mutex);
}
}
pub fn io(pool: *Pool) Io {
return .{
.userdata = pool,
.vtable = &.{
.async = async,
.concurrent = concurrent,
.await = await,
.cancel = cancel,
.cancelRequested = cancelRequested,
.select = select,
.groupAsync = groupAsync,
.groupWait = groupWait,
.groupCancel = groupCancel,
.mutexLock = mutexLock,
.mutexLockUncancelable = mutexLockUncancelable,
.mutexUnlock = mutexUnlock,
.conditionWait = conditionWait,
.conditionWaitUncancelable = conditionWaitUncancelable,
.conditionWake = conditionWake,
.dirMake = switch (builtin.os.tag) {
.windows => @panic("TODO"),
.wasi => @panic("TODO"),
else => dirMakePosix,
},
.dirStat = dirStat,
.dirStatPath = switch (builtin.os.tag) {
.linux => dirStatPathLinux,
.windows => @panic("TODO"),
.wasi => @panic("TODO"),
else => dirStatPathPosix,
},
.fileStat = switch (builtin.os.tag) {
.linux => fileStatLinux,
.windows => fileStatWindows,
.wasi => fileStatWasi,
else => fileStatPosix,
},
.dirCreateFile = switch (builtin.os.tag) {
.windows => @panic("TODO"),
.wasi => @panic("TODO"),
else => dirCreateFilePosix,
},
.dirOpenFile = dirOpenFile,
.fileClose = fileClose,
.pwrite = pwrite,
.fileReadStreaming = fileReadStreaming,
.fileReadPositional = fileReadPositional,
.fileSeekBy = fileSeekBy,
.fileSeekTo = fileSeekTo,
.now = switch (builtin.os.tag) {
.windows => nowWindows,
.wasi => nowWasi,
else => nowPosix,
},
.sleep = switch (builtin.os.tag) {
.windows => sleepWindows,
.wasi => sleepWasi,
.linux => sleepLinux,
else => sleepPosix,
},
.netListenIp = switch (builtin.os.tag) {
.windows => @panic("TODO"),
else => netListenIpPosix,
},
.netListenUnix = netListenUnix,
.netAccept = switch (builtin.os.tag) {
.windows => @panic("TODO"),
else => netAcceptPosix,
},
.netBindIp = switch (builtin.os.tag) {
.windows => @panic("TODO"),
else => netBindIpPosix,
},
.netConnectIp = switch (builtin.os.tag) {
.windows => @panic("TODO"),
else => netConnectIpPosix,
},
.netConnectUnix = netConnectUnix,
.netClose = netClose,
.netRead = switch (builtin.os.tag) {
.windows => @panic("TODO"),
else => netReadPosix,
},
.netWrite = switch (builtin.os.tag) {
.windows => @panic("TODO"),
else => netWritePosix,
},
.netSend = netSend,
.netReceive = netReceive,
.netInterfaceNameResolve = netInterfaceNameResolve,
.netInterfaceName = netInterfaceName,
.netLookup = netLookup,
},
};
}
/// Trailing data:
/// 1. context
/// 2. result
const AsyncClosure = struct {
closure: Closure,
func: *const fn (context: *anyopaque, result: *anyopaque) void,
reset_event: ResetEvent,
select_condition: ?*ResetEvent,
context_alignment: std.mem.Alignment,
result_offset: usize,
/// Whether the task has a return type with nonzero bits.
has_result: bool,
const done_reset_event: *ResetEvent = @ptrFromInt(@alignOf(ResetEvent));
fn start(closure: *Closure) void {
const ac: *AsyncClosure = @alignCast(@fieldParentPtr("closure", closure));
const tid = std.Thread.getCurrentId();
if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, 0, tid, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == Closure.canceling_tid);
// Even though we already know the task is canceled, we must still
// run the closure in order to make the return value valid - that
// is, unless the result is zero bytes!
if (!ac.has_result) {
ac.reset_event.set();
return;
}
}
current_closure = closure;
ac.func(ac.contextPointer(), ac.resultPointer());
current_closure = null;
// In case a cancel happens after successful task completion, prevents
// signal from being delivered to the thread in `requestCancel`.
if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, tid, 0, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == Closure.canceling_tid);
}
if (@atomicRmw(?*ResetEvent, &ac.select_condition, .Xchg, done_reset_event, .release)) |select_reset| {
assert(select_reset != done_reset_event);
select_reset.set();
}
ac.reset_event.set();
}
fn resultPointer(ac: *AsyncClosure) [*]u8 {
const base: [*]u8 = @ptrCast(ac);
return base + ac.result_offset;
}
fn contextPointer(ac: *AsyncClosure) [*]u8 {
const base: [*]u8 = @ptrCast(ac);
return base + ac.context_alignment.forward(@sizeOf(AsyncClosure));
}
fn waitAndFree(ac: *AsyncClosure, gpa: Allocator, result: []u8) void {
ac.reset_event.wait();
@memcpy(result, ac.resultPointer()[0..result.len]);
free(ac, gpa, result.len);
}
fn free(ac: *AsyncClosure, gpa: Allocator, result_len: usize) void {
if (!ac.has_result) assert(result_len == 0);
const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(ac);
gpa.free(base[0 .. ac.result_offset + result_len]);
}
};
fn async(
userdata: ?*anyopaque,
result: []u8,
result_alignment: std.mem.Alignment,
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*Io.AnyFuture {
if (builtin.single_threaded) {
start(context.ptr, result.ptr);
return null;
}
const pool: *Pool = @ptrCast(@alignCast(userdata));
const cpu_count = pool.cpu_count catch {
return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
start(context.ptr, result.ptr);
return null;
};
};
const gpa = pool.allocator;
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len);
const n = result_offset + result.len;
const ac: *AsyncClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch {
start(context.ptr, result.ptr);
return null;
}));
ac.* = .{
.closure = .{
.cancel_tid = 0,
.start = AsyncClosure.start,
.is_concurrent = false,
},
.func = start,
.context_alignment = context_alignment,
.result_offset = result_offset,
.has_result = result.len != 0,
.reset_event = .unset,
.select_condition = null,
};
@memcpy(ac.contextPointer()[0..context.len], context);
pool.mutex.lock();
const thread_capacity = cpu_count - 1 + pool.concurrent_count;
pool.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
pool.mutex.unlock();
ac.free(gpa, result.len);
start(context.ptr, result.ptr);
return null;
};
pool.run_queue.prepend(&ac.closure.node);
if (pool.threads.items.len < thread_capacity) {
const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch {
if (pool.threads.items.len == 0) {
assert(pool.run_queue.popFirst() == &ac.closure.node);
pool.mutex.unlock();
ac.free(gpa, result.len);
start(context.ptr, result.ptr);
return null;
}
// Rely on other workers to do it.
pool.mutex.unlock();
pool.cond.signal();
return @ptrCast(ac);
};
pool.threads.appendAssumeCapacity(thread);
}
pool.mutex.unlock();
pool.cond.signal();
return @ptrCast(ac);
}
fn concurrent(
userdata: ?*anyopaque,
result_len: usize,
result_alignment: std.mem.Alignment,
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) error{OutOfMemory}!*Io.AnyFuture {
if (builtin.single_threaded) unreachable;
const pool: *Pool = @ptrCast(@alignCast(userdata));
const cpu_count = pool.cpu_count catch 1;
const gpa = pool.allocator;
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len);
const n = result_offset + result_len;
const ac: *AsyncClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(AsyncClosure), n)));
ac.* = .{
.closure = .{
.cancel_tid = 0,
.start = AsyncClosure.start,
.is_concurrent = true,
},
.func = start,
.context_alignment = context_alignment,
.result_offset = result_offset,
.has_result = result_len != 0,
.reset_event = .unset,
.select_condition = null,
};
@memcpy(ac.contextPointer()[0..context.len], context);
pool.mutex.lock();
pool.concurrent_count += 1;
const thread_capacity = cpu_count - 1 + pool.concurrent_count;
pool.threads.ensureTotalCapacity(gpa, thread_capacity) catch {
pool.mutex.unlock();
ac.free(gpa, result_len);
return error.OutOfMemory;
};
pool.run_queue.prepend(&ac.closure.node);
if (pool.threads.items.len < thread_capacity) {
const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch {
assert(pool.run_queue.popFirst() == &ac.closure.node);
pool.mutex.unlock();
ac.free(gpa, result_len);
return error.OutOfMemory;
};
pool.threads.appendAssumeCapacity(thread);
}
pool.mutex.unlock();
pool.cond.signal();
return @ptrCast(ac);
}
const GroupClosure = struct {
closure: Closure,
pool: *Pool,
group: *Io.Group,
/// Points to sibling `GroupClosure`. Used for walking the group to cancel all.
node: std.SinglyLinkedList.Node,
func: *const fn (context: *anyopaque) void,
context_alignment: std.mem.Alignment,
context_len: usize,
fn start(closure: *Closure) void {
const gc: *GroupClosure = @alignCast(@fieldParentPtr("closure", closure));
const tid = std.Thread.getCurrentId();
const group = gc.group;
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
const reset_event: *ResetEvent = @ptrCast(&group.context);
if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, 0, tid, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == Closure.canceling_tid);
// We already know the task is canceled before running the callback. Since all closures
// in a Group have void return type, we can return early.
std.Thread.WaitGroup.finishStateless(group_state, reset_event);
return;
}
current_closure = closure;
gc.func(gc.contextPointer());
current_closure = null;
// In case a cancel happens after successful task completion, prevents
// signal from being delivered to the thread in `requestCancel`.
if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, tid, 0, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == Closure.canceling_tid);
}
std.Thread.WaitGroup.finishStateless(group_state, reset_event);
}
fn free(gc: *GroupClosure, gpa: Allocator) void {
const base: [*]align(@alignOf(GroupClosure)) u8 = @ptrCast(gc);
gpa.free(base[0..contextEnd(gc.context_alignment, gc.context_len)]);
}
fn contextOffset(context_alignment: std.mem.Alignment) usize {
return context_alignment.forward(@sizeOf(GroupClosure));
}
fn contextEnd(context_alignment: std.mem.Alignment, context_len: usize) usize {
return contextOffset(context_alignment) + context_len;
}
fn contextPointer(gc: *GroupClosure) [*]u8 {
const base: [*]u8 = @ptrCast(gc);
return base + contextOffset(gc.context_alignment);
}
};
fn groupAsync(
userdata: ?*anyopaque,
group: *Io.Group,
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque) void,
) void {
if (builtin.single_threaded) return start(context.ptr);
const pool: *Pool = @ptrCast(@alignCast(userdata));
const cpu_count = pool.cpu_count catch 1;
const gpa = pool.allocator;
const n = GroupClosure.contextEnd(context_alignment, context.len);
const gc: *GroupClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(GroupClosure), n) catch {
return start(context.ptr);
}));
gc.* = .{
.closure = .{
.cancel_tid = 0,
.start = GroupClosure.start,
.is_concurrent = false,
},
.pool = pool,
.group = group,
.node = undefined,
.func = start,
.context_alignment = context_alignment,
.context_len = context.len,
};
@memcpy(gc.contextPointer()[0..context.len], context);
pool.mutex.lock();
// Append to the group linked list inside the mutex to make `Io.Group.async` thread-safe.
gc.node = .{ .next = @ptrCast(@alignCast(group.token)) };
group.token = &gc.node;
const thread_capacity = cpu_count - 1 + pool.concurrent_count;
pool.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
pool.mutex.unlock();
gc.free(gpa);
return start(context.ptr);
};
pool.run_queue.prepend(&gc.closure.node);
if (pool.threads.items.len < thread_capacity) {
const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch {
assert(pool.run_queue.popFirst() == &gc.closure.node);
pool.mutex.unlock();
gc.free(gpa);
return start(context.ptr);
};
pool.threads.appendAssumeCapacity(thread);
}
// This needs to be done before unlocking the mutex to avoid a race with
// the associated task finishing.
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
std.Thread.WaitGroup.startStateless(group_state);
pool.mutex.unlock();
pool.cond.signal();
}
fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
const gpa = pool.allocator;
if (builtin.single_threaded) return;
// TODO these primitives are too high level, need to check cancel on EINTR
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
const reset_event: *ResetEvent = @ptrCast(&group.context);
std.Thread.WaitGroup.waitStateless(group_state, reset_event);
var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
while (true) {
const gc: *GroupClosure = @fieldParentPtr("node", node);
const node_next = node.next;
gc.free(gpa);
node = node_next orelse break;
}
}
fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
const gpa = pool.allocator;
if (builtin.single_threaded) return;
{
var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
while (true) {
const gc: *GroupClosure = @fieldParentPtr("node", node);
gc.closure.requestCancel();
node = node.next orelse break;
}
}
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
const reset_event: *ResetEvent = @ptrCast(&group.context);
std.Thread.WaitGroup.waitStateless(group_state, reset_event);
{
var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
while (true) {
const gc: *GroupClosure = @fieldParentPtr("node", node);
const node_next = node.next;
gc.free(gpa);
node = node_next orelse break;
}
}
}
fn await(
userdata: ?*anyopaque,
any_future: *Io.AnyFuture,
result: []u8,
result_alignment: std.mem.Alignment,
) void {
_ = result_alignment;
const pool: *Pool = @ptrCast(@alignCast(userdata));
const closure: *AsyncClosure = @ptrCast(@alignCast(any_future));
closure.waitAndFree(pool.allocator, result);
}
fn cancel(
userdata: ?*anyopaque,
any_future: *Io.AnyFuture,
result: []u8,
result_alignment: std.mem.Alignment,
) void {
_ = result_alignment;
const pool: *Pool = @ptrCast(@alignCast(userdata));
const ac: *AsyncClosure = @ptrCast(@alignCast(any_future));
ac.closure.requestCancel();
ac.waitAndFree(pool.allocator, result);
}
fn cancelRequested(userdata: ?*anyopaque) bool {
const pool: *Pool = @ptrCast(@alignCast(userdata));
_ = pool;
const closure = current_closure orelse return false;
return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == Closure.canceling_tid;
}
fn checkCancel(pool: *Pool) error{Canceled}!void {
if (cancelRequested(pool)) return error.Canceled;
}
fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
if (prev_state == .contended) {
try pool.checkCancel();
futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) {
try pool.checkCancel();
futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
}
fn mutexLockUncancelable(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
_ = userdata;
if (prev_state == .contended) {
futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) {
futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
}
fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
_ = userdata;
_ = prev_state;
if (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .unlocked, .release) == .contended) {
futexWake(@ptrCast(&mutex.state), 1);
}
}
fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
const pool_io = pool.io();
comptime assert(@TypeOf(cond.state) == u64);
const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
const cond_state = &ints[0];
const cond_epoch = &ints[1];
const one_waiter = 1;
const waiter_mask = 0xffff;
const one_signal = 1 << 16;
const signal_mask = 0xffff << 16;
var epoch = cond_epoch.load(.acquire);
var state = cond_state.fetchAdd(one_waiter, .monotonic);
assert(state & waiter_mask != waiter_mask);
state += one_waiter;
mutex.unlock(pool_io);
defer mutex.lockUncancelable(pool_io);
while (true) {
futexWait(cond_epoch, epoch);
epoch = cond_epoch.load(.acquire);
state = cond_state.load(.monotonic);
while (state & signal_mask != 0) {
const new_state = state - one_waiter - one_signal;
state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return;
}
}
}
fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
comptime assert(@TypeOf(cond.state) == u64);
const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
const cond_state = &ints[0];
const cond_epoch = &ints[1];
const one_waiter = 1;
const waiter_mask = 0xffff;
const one_signal = 1 << 16;
const signal_mask = 0xffff << 16;
// Observe the epoch, then check the state again to see if we should wake up.
// The epoch must be observed before we check the state or we could potentially miss a wake() and deadlock:
//
// - T1: s = LOAD(&state)
// - T2: UPDATE(&s, signal)
// - T2: UPDATE(&epoch, 1) + FUTEX_WAKE(&epoch)
// - T1: e = LOAD(&epoch) (was reordered after the state load)
// - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed the state update + the epoch change)
//
// Acquire barrier to ensure the epoch load happens before the state load.
var epoch = cond_epoch.load(.acquire);
var state = cond_state.fetchAdd(one_waiter, .monotonic);
assert(state & waiter_mask != waiter_mask);
state += one_waiter;
mutex.unlock(pool.io());
defer mutex.lockUncancelable(pool.io());
while (true) {
try pool.checkCancel();
futexWait(cond_epoch, epoch);
epoch = cond_epoch.load(.acquire);
state = cond_state.load(.monotonic);
// Try to wake up by consuming a signal and decremented the waiter we
// added previously. Acquire barrier ensures code before the wake()
// which added the signal happens before we decrement it and return.
while (state & signal_mask != 0) {
const new_state = state - one_waiter - one_signal;
state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return;
}
}
}
fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
_ = pool;
comptime assert(@TypeOf(cond.state) == u64);
const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
const cond_state = &ints[0];
const cond_epoch = &ints[1];
const one_waiter = 1;
const waiter_mask = 0xffff;
const one_signal = 1 << 16;
const signal_mask = 0xffff << 16;
var state = cond_state.load(.monotonic);
while (true) {
const waiters = (state & waiter_mask) / one_waiter;
const signals = (state & signal_mask) / one_signal;
// Reserves which waiters to wake up by incrementing the signals count.
// Therefore, the signals count is always less than or equal to the
// waiters count. We don't need to Futex.wake if there's nothing to
// wake up or if other wake() threads have reserved to wake up the
// current waiters.
const wakeable = waiters - signals;
if (wakeable == 0) {
return;
}
const to_wake = switch (wake) {
.one => 1,
.all => wakeable,
};
// Reserve the amount of waiters to wake by incrementing the signals
// count. Release barrier ensures code before the wake() happens before
// the signal it posted and consumed by the wait() threads.
const new_state = state + (one_signal * to_wake);
state = cond_state.cmpxchgWeak(state, new_state, .release, .monotonic) orelse {
// Wake up the waiting threads we reserved above by changing the epoch value.
//
// A waiting thread could miss a wake up if *exactly* ((1<<32)-1)
// wake()s happen between it observing the epoch and sleeping on
// it. This is very unlikely due to how many precise amount of
// Futex.wake() calls that would be between the waiting thread's
// potential preemption.
//
// Release barrier ensures the signal being added to the state
// happens before the epoch is changed. If not, the waiting thread
// could potentially deadlock from missing both the state and epoch
// change:
//
// - T2: UPDATE(&epoch, 1) (reordered before the state change)
// - T1: e = LOAD(&epoch)
// - T1: s = LOAD(&state)
// - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch)
// - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change)
_ = cond_epoch.fetchAdd(1, .release);
futexWake(cond_epoch, to_wake);
return;
};
}
}
fn dirMakePosix(userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, mode: Io.Dir.Mode) Io.Dir.MakeError!void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
var path_buffer: [posix.PATH_MAX]u8 = undefined;
const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
while (true) {
try pool.checkCancel();
switch (posix.errno(posix.system.mkdirat(dir.handle, sub_path_posix, mode))) {
.SUCCESS => return,
.INTR => continue,
.ACCES => return error.AccessDenied,
.BADF => |err| return errnoBug(err),
.PERM => return error.PermissionDenied,
.DQUOT => return error.DiskQuota,
.EXIST => return error.PathAlreadyExists,
.FAULT => |err| return errnoBug(err),
.LOOP => return error.SymLinkLoop,
.MLINK => return error.LinkQuotaExceeded,
.NAMETOOLONG => return error.NameTooLong,
.NOENT => return error.FileNotFound,
.NOMEM => return error.SystemResources,
.NOSPC => return error.NoSpaceLeft,
.NOTDIR => return error.NotDir,
.ROFS => return error.ReadOnlyFileSystem,
// dragonfly: when dir_fd is unlinked from filesystem
.NOTCONN => return error.FileNotFound,
.ILSEQ => return error.BadPathName,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn dirStat(userdata: ?*anyopaque, dir: Io.Dir) Io.Dir.StatError!Io.Dir.Stat {
const pool: *Pool = @ptrCast(@alignCast(userdata));
try pool.checkCancel();
_ = dir;
@panic("TODO");
}
fn dirStatPathLinux(
userdata: ?*anyopaque,
dir: Io.Dir,
sub_path: []const u8,
options: Io.Dir.StatPathOptions,
) Io.Dir.StatPathError!Io.File.Stat {
const pool: *Pool = @ptrCast(@alignCast(userdata));
const linux = std.os.linux;
var path_buffer: [posix.PATH_MAX]u8 = undefined;
const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
const flags: u32 = linux.AT.NO_AUTOMOUNT |
@as(u32, if (!options.follow_symlinks) linux.AT.SYMLINK_NOFOLLOW else 0);
while (true) {
try pool.checkCancel();
var statx = std.mem.zeroes(linux.Statx);
const rc = linux.statx(
dir.handle,
sub_path_posix,
flags,
linux.STATX_TYPE | linux.STATX_MODE | linux.STATX_ATIME | linux.STATX_MTIME | linux.STATX_CTIME,
&statx,
);
switch (linux.E.init(rc)) {
.SUCCESS => return statFromLinux(&statx),
.INTR => continue,
.ACCES => return error.AccessDenied,
.BADF => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err),
.LOOP => return error.SymLinkLoop,
.NAMETOOLONG => |err| return errnoBug(err), // Handled by pathToPosix() above.
.NOENT => return error.FileNotFound,
.NOTDIR => return error.NotDir,
.NOMEM => return error.SystemResources,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn dirStatPathPosix(
userdata: ?*anyopaque,
dir: Io.Dir,
sub_path: []const u8,
options: Io.Dir.StatPathOptions,
) Io.Dir.StatPathError!Io.File.Stat {
const pool: *Pool = @ptrCast(@alignCast(userdata));
var path_buffer: [posix.PATH_MAX]u8 = undefined;
const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
const flags: u32 = if (!options.follow_symlinks) posix.AT.SYMLINK_NOFOLLOW else 0;
while (true) {
try pool.checkCancel();
var stat = std.mem.zeroes(posix.Stat);
switch (posix.errno(fstatat_sym(dir.handle, sub_path_posix, &stat, flags))) {
.SUCCESS => return statFromPosix(stat),
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err), // Always a race condition.
.NOMEM => return error.SystemResources,
.ACCES => return error.AccessDenied,
.PERM => return error.PermissionDenied,
.FAULT => |err| return errnoBug(err),
.NAMETOOLONG => return error.NameTooLong,
.LOOP => return error.SymLinkLoop,
.NOENT => return error.FileNotFound,
.NOTDIR => return error.FileNotFound,
.ILSEQ => return error.BadPathName,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn fileStatPosix(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat {
const pool: *Pool = @ptrCast(@alignCast(userdata));
if (posix.Stat == void) return error.Streaming;
while (true) {
try pool.checkCancel();
var stat = std.mem.zeroes(posix.Stat);
switch (posix.errno(fstat_sym(file.handle, &stat))) {
.SUCCESS => return statFromPosix(&stat),
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err),
.NOMEM => return error.SystemResources,
.ACCES => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn fileStatLinux(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat {
const pool: *Pool = @ptrCast(@alignCast(userdata));
const linux = std.os.linux;
while (true) {
try pool.checkCancel();
var statx = std.mem.zeroes(linux.Statx);
const rc = linux.statx(
file.handle,
"",
linux.AT.EMPTY_PATH,
linux.STATX_TYPE | linux.STATX_MODE | linux.STATX_ATIME | linux.STATX_MTIME | linux.STATX_CTIME,
&statx,
);
switch (linux.E.init(rc)) {
.SUCCESS => return statFromLinux(&statx),
.INTR => continue,
.ACCES => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err),
.LOOP => |err| return errnoBug(err),
.NAMETOOLONG => |err| return errnoBug(err),
.NOENT => |err| return errnoBug(err),
.NOMEM => return error.SystemResources,
.NOTDIR => |err| return errnoBug(err),
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn fileStatWindows(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat {
const pool: *Pool = @ptrCast(@alignCast(userdata));
try pool.checkCancel();
_ = file;
@panic("TODO");
}
fn fileStatWasi(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat {
if (builtin.link_libc) return fileStatPosix(userdata, file);
const pool: *Pool = @ptrCast(@alignCast(userdata));
while (true) {
try pool.checkCancel();
var stat: std.os.wasi.filestat_t = undefined;
switch (std.os.wasi.fd_filestat_get(file.handle, &stat)) {
.SUCCESS => return statFromWasi(&stat),
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err),
.NOMEM => return error.SystemResources,
.ACCES => return error.AccessDenied,
.NOTCAPABLE => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
}
}
const have_flock = @TypeOf(posix.system.flock) != void;
const openat_sym = if (posix.lfs64_abi) posix.system.openat64 else posix.system.openat;
const fstat_sym = if (posix.lfs64_abi) posix.system.fstat64 else posix.system.fstat;
const fstatat_sym = if (posix.lfs64_abi) posix.system.fstatat64 else posix.system.fstatat;
const lseek_sym = if (posix.lfs64_abi) posix.system.lseek64 else posix.system.lseek;
const preadv_sym = if (posix.lfs64_abi) posix.system.preadv64 else posix.system.preadv;
fn dirCreateFilePosix(
userdata: ?*anyopaque,
dir: Io.Dir,
sub_path: []const u8,
flags: Io.File.CreateFlags,
) Io.File.OpenError!Io.File {
const pool: *Pool = @ptrCast(@alignCast(userdata));
var path_buffer: [posix.PATH_MAX]u8 = undefined;
const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
var os_flags: posix.O = .{
.ACCMODE = if (flags.read) .RDWR else .WRONLY,
.CREAT = true,
.TRUNC = flags.truncate,
.EXCL = flags.exclusive,
};
if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true;
if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true;
// Use the O locking flags if the os supports them to acquire the lock
// atomically. Note that the NONBLOCK flag is removed after the openat()
// call is successful.
const has_flock_open_flags = @hasField(posix.O, "EXLOCK");
if (has_flock_open_flags) switch (flags.lock) {
.none => {},
.shared => {
os_flags.SHLOCK = true;
os_flags.NONBLOCK = flags.lock_nonblocking;
},
.exclusive => {
os_flags.EXLOCK = true;
os_flags.NONBLOCK = flags.lock_nonblocking;
},
};
const fd: posix.fd_t = while (true) {
try pool.checkCancel();
const rc = openat_sym(dir.handle, sub_path_posix, os_flags, flags.mode);
switch (posix.errno(rc)) {
.SUCCESS => break @intCast(rc),
.INTR => continue,
.FAULT => |err| return errnoBug(err),
.INVAL => return error.BadPathName,
.BADF => |err| return errnoBug(err),
.ACCES => return error.AccessDenied,
.FBIG => return error.FileTooBig,
.OVERFLOW => return error.FileTooBig,
.ISDIR => return error.IsDir,
.LOOP => return error.SymLinkLoop,
.MFILE => return error.ProcessFdQuotaExceeded,
.NAMETOOLONG => return error.NameTooLong,
.NFILE => return error.SystemFdQuotaExceeded,
.NODEV => return error.NoDevice,
.NOENT => return error.FileNotFound,
.SRCH => return error.ProcessNotFound,
.NOMEM => return error.SystemResources,
.NOSPC => return error.NoSpaceLeft,
.NOTDIR => return error.NotDir,
.PERM => return error.PermissionDenied,
.EXIST => return error.PathAlreadyExists,
.BUSY => return error.DeviceBusy,
.OPNOTSUPP => return error.FileLocksNotSupported,
.AGAIN => return error.WouldBlock,
.TXTBSY => return error.FileBusy,
.NXIO => return error.NoDevice,
.ILSEQ => return error.BadPathName,
else => |err| return posix.unexpectedErrno(err),
}
};
errdefer posix.close(fd);
if (have_flock and !has_flock_open_flags and flags.lock != .none) {
const lock_nonblocking: i32 = if (flags.lock_nonblocking) posix.LOCK.NB else 0;
const lock_flags = switch (flags.lock) {
.none => unreachable,
.shared => posix.LOCK.SH | lock_nonblocking,
.exclusive => posix.LOCK.EX | lock_nonblocking,
};
while (true) {
try pool.checkCancel();
switch (posix.errno(posix.system.flock(fd, lock_flags))) {
.SUCCESS => break,
.INTR => continue,
.BADF => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err), // invalid parameters
.NOLCK => return error.SystemResources,
.AGAIN => return error.WouldBlock,
.OPNOTSUPP => return error.FileLocksNotSupported,
else => |err| return posix.unexpectedErrno(err),
}
}
}
if (has_flock_open_flags and flags.lock_nonblocking) {
var fl_flags: usize = while (true) {
try pool.checkCancel();
switch (posix.errno(posix.system.fcntl(fd, posix.F.GETFL, 0))) {
.SUCCESS => break,
.INTR => continue,
else => |err| return posix.unexpectedErrno(err),
}
};
fl_flags &= ~@as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK"));
while (true) {
try pool.checkCancel();
switch (posix.errno(posix.fcntl(fd, posix.F.SETFL, fl_flags))) {
.SUCCESS => break,
.INTR => continue,
else => |err| return posix.unexpectedErrno(err),
}
}
}
return .{ .handle = fd };
}
fn dirOpenFile(
userdata: ?*anyopaque,
dir: Io.Dir,
sub_path: []const u8,
flags: Io.File.OpenFlags,
) Io.File.OpenError!Io.File {
const pool: *Pool = @ptrCast(@alignCast(userdata));
var path_buffer: [posix.PATH_MAX]u8 = undefined;
const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
var os_flags: posix.O = switch (native_os) {
.wasi => .{
.read = flags.mode != .write_only,
.write = flags.mode != .read_only,
},
else => .{
.ACCMODE = switch (flags.mode) {
.read_only => .RDONLY,
.write_only => .WRONLY,
.read_write => .RDWR,
},
},
};
if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true;
if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true;
if (@hasField(posix.O, "NOCTTY")) os_flags.NOCTTY = !flags.allow_ctty;
// Use the O locking flags if the os supports them to acquire the lock
// atomically.
const has_flock_open_flags = @hasField(posix.O, "EXLOCK");
if (has_flock_open_flags) {
// Note that the NONBLOCK flag is removed after the openat() call
// is successful.
switch (flags.lock) {
.none => {},
.shared => {
os_flags.SHLOCK = true;
os_flags.NONBLOCK = flags.lock_nonblocking;
},
.exclusive => {
os_flags.EXLOCK = true;
os_flags.NONBLOCK = flags.lock_nonblocking;
},
}
}
const fd: posix.fd_t = while (true) {
try pool.checkCancel();
const rc = openat_sym(dir.handle, sub_path_posix, os_flags, @as(posix.mode_t, 0));
switch (posix.errno(rc)) {
.SUCCESS => break @intCast(rc),
.INTR => continue,
.FAULT => |err| return errnoBug(err),
.INVAL => return error.BadPathName,
.BADF => |err| return errnoBug(err),
.ACCES => return error.AccessDenied,
.FBIG => return error.FileTooBig,
.OVERFLOW => return error.FileTooBig,
.ISDIR => return error.IsDir,
.LOOP => return error.SymLinkLoop,
.MFILE => return error.ProcessFdQuotaExceeded,
.NAMETOOLONG => return error.NameTooLong,
.NFILE => return error.SystemFdQuotaExceeded,
.NODEV => return error.NoDevice,
.NOENT => return error.FileNotFound,
.SRCH => return error.ProcessNotFound,
.NOMEM => return error.SystemResources,
.NOSPC => return error.NoSpaceLeft,
.NOTDIR => return error.NotDir,
.PERM => return error.PermissionDenied,
.EXIST => return error.PathAlreadyExists,
.BUSY => return error.DeviceBusy,
.OPNOTSUPP => return error.FileLocksNotSupported,
.AGAIN => return error.WouldBlock,
.TXTBSY => return error.FileBusy,
.NXIO => return error.NoDevice,
.ILSEQ => return error.BadPathName,
else => |err| return posix.unexpectedErrno(err),
}
};
errdefer posix.close(fd);
if (have_flock and !has_flock_open_flags and flags.lock != .none) {
const lock_nonblocking: i32 = if (flags.lock_nonblocking) posix.LOCK.NB else 0;
const lock_flags = switch (flags.lock) {
.none => unreachable,
.shared => posix.LOCK.SH | lock_nonblocking,
.exclusive => posix.LOCK.EX | lock_nonblocking,
};
while (true) {
try pool.checkCancel();
switch (posix.errno(posix.system.flock(fd, lock_flags))) {
.SUCCESS => break,
.INTR => continue,
.BADF => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err), // invalid parameters
.NOLCK => return error.SystemResources,
.AGAIN => return error.WouldBlock,
.OPNOTSUPP => return error.FileLocksNotSupported,
else => |err| return posix.unexpectedErrno(err),
}
}
}
if (has_flock_open_flags and flags.lock_nonblocking) {
var fl_flags: usize = while (true) {
try pool.checkCancel();
switch (posix.errno(posix.system.fcntl(fd, posix.F.GETFL, 0))) {
.SUCCESS => break,
.INTR => continue,
else => |err| return posix.unexpectedErrno(err),
}
};
fl_flags &= ~@as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK"));
while (true) {
try pool.checkCancel();
switch (posix.errno(posix.fcntl(fd, posix.F.SETFL, fl_flags))) {
.SUCCESS => break,
.INTR => continue,
else => |err| return posix.unexpectedErrno(err),
}
}
}
return .{ .handle = fd };
}
fn fileClose(userdata: ?*anyopaque, file: Io.File) void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
_ = pool;
posix.close(file.handle);
}
fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File.ReadStreamingError!usize {
const pool: *Pool = @ptrCast(@alignCast(userdata));
if (is_windows) {
const DWORD = windows.DWORD;
var index: usize = 0;
var truncate: usize = 0;
var total: usize = 0;
while (index < data.len) {
try pool.checkCancel();
{
const untruncated = data[index];
data[index] = untruncated[truncate..];
defer data[index] = untruncated;
const buffer = data[index..];
const want_read_count: DWORD = @min(std.math.maxInt(DWORD), buffer.len);
var n: DWORD = undefined;
if (windows.kernel32.ReadFile(file.handle, buffer.ptr, want_read_count, &n, null) == 0) {
switch (windows.GetLastError()) {
.IO_PENDING => unreachable,
.OPERATION_ABORTED => continue,
.BROKEN_PIPE => return 0,
.HANDLE_EOF => return 0,
.NETNAME_DELETED => return error.ConnectionResetByPeer,
.LOCK_VIOLATION => return error.LockViolation,
.ACCESS_DENIED => return error.AccessDenied,
.INVALID_HANDLE => return error.NotOpenForReading,
else => |err| return windows.unexpectedError(err),
}
}
total += n;
truncate += n;
}
while (index < data.len and truncate >= data[index].len) {
truncate -= data[index].len;
index += 1;
}
}
return total;
}
var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
var i: usize = 0;
for (data) |buf| {
if (iovecs_buffer.len - i == 0) break;
if (buf.len != 0) {
iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len };
i += 1;
}
}
const dest = iovecs_buffer[0..i];
assert(dest[0].len > 0);
if (native_os == .wasi and !builtin.link_libc) while (true) {
try pool.checkCancel();
var nread: usize = undefined;
switch (std.os.wasi.fd_read(file.handle, dest.ptr, dest.len, &nread)) {
.SUCCESS => return nread,
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err),
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.Timeout,
.NOTCAPABLE => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
};
while (true) {
try pool.checkCancel();
const rc = posix.system.readv(file.handle, dest.ptr, @intCast(dest.len));
switch (posix.errno(rc)) {
.SUCCESS => return @intCast(rc),
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.SRCH => return error.ProcessNotFound,
.AGAIN => return error.WouldBlock,
.BADF => return error.NotOpenForReading, // can be a race condition
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.Timeout,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset: u64) Io.File.ReadPositionalError!usize {
const pool: *Pool = @ptrCast(@alignCast(userdata));
if (is_windows) {
const DWORD = windows.DWORD;
const OVERLAPPED = windows.OVERLAPPED;
var index: usize = 0;
var truncate: usize = 0;
var total: usize = 0;
while (true) {
try pool.checkCancel();
{
const untruncated = data[index];
data[index] = untruncated[truncate..];
defer data[index] = untruncated;
const buffer = data[index..];
const want_read_count: DWORD = @min(std.math.maxInt(DWORD), buffer.len);
var n: DWORD = undefined;
var overlapped_data: OVERLAPPED = undefined;
const overlapped: ?*OVERLAPPED = if (offset) |off| blk: {
overlapped_data = .{
.Internal = 0,
.InternalHigh = 0,
.DUMMYUNIONNAME = .{
.DUMMYSTRUCTNAME = .{
.Offset = @as(u32, @truncate(off)),
.OffsetHigh = @as(u32, @truncate(off >> 32)),
},
},
.hEvent = null,
};
break :blk &overlapped_data;
} else null;
if (windows.kernel32.ReadFile(file.handle, buffer.ptr, want_read_count, &n, overlapped) == 0) {
switch (windows.GetLastError()) {
.IO_PENDING => unreachable,
.OPERATION_ABORTED => continue,
.BROKEN_PIPE => return 0,
.HANDLE_EOF => return 0,
.NETNAME_DELETED => return error.ConnectionResetByPeer,
.LOCK_VIOLATION => return error.LockViolation,
.ACCESS_DENIED => return error.AccessDenied,
.INVALID_HANDLE => return error.NotOpenForReading,
else => |err| return windows.unexpectedError(err),
}
}
total += n;
truncate += n;
}
while (index < data.len and truncate >= data[index].len) {
truncate -= data[index].len;
index += 1;
}
}
return total;
}
const have_pread_but_not_preadv = switch (native_os) {
.windows, .haiku, .serenity => true,
else => false,
};
if (have_pread_but_not_preadv) {
@compileError("TODO");
}
var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
var i: usize = 0;
for (data) |buf| {
if (iovecs_buffer.len - i == 0) break;
if (buf.len != 0) {
iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len };
i += 1;
}
}
const dest = iovecs_buffer[0..i];
assert(dest[0].len > 0);
if (native_os == .wasi and !builtin.link_libc) while (true) {
try pool.checkCancel();
var nread: usize = undefined;
switch (std.os.wasi.fd_pread(file.handle, dest.ptr, dest.len, offset, &nread)) {
.SUCCESS => return nread,
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.AGAIN => |err| return errnoBug(err),
.BADF => return error.NotOpenForReading, // can be a race condition
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.Timeout,
.NXIO => return error.Unseekable,
.SPIPE => return error.Unseekable,
.OVERFLOW => return error.Unseekable,
.NOTCAPABLE => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
};
while (true) {
try pool.checkCancel();
const rc = preadv_sym(file.handle, dest.ptr, @intCast(dest.len), @bitCast(offset));
switch (posix.errno(rc)) {
.SUCCESS => return @bitCast(rc),
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.SRCH => return error.ProcessNotFound,
.AGAIN => return error.WouldBlock,
.BADF => return error.NotOpenForReading, // can be a race condition
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.Timeout,
.NXIO => return error.Unseekable,
.SPIPE => return error.Unseekable,
.OVERFLOW => return error.Unseekable,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn fileSeekBy(userdata: ?*anyopaque, file: Io.File, offset: i64) Io.File.SeekError!void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
try pool.checkCancel();
_ = file;
_ = offset;
@panic("TODO");
}
fn fileSeekTo(userdata: ?*anyopaque, file: Io.File, offset: u64) Io.File.SeekError!void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
const fd = file.handle;
if (native_os == .linux and !builtin.link_libc and @sizeOf(usize) == 4) while (true) {
try pool.checkCancel();
var result: u64 = undefined;
switch (posix.errno(posix.system.llseek(fd, offset, &result, posix.SEEK.SET))) {
.SUCCESS => return,
.INTR => continue,
.BADF => |err| return errnoBug(err), // Always a race condition.
.INVAL => return error.Unseekable,
.OVERFLOW => return error.Unseekable,
.SPIPE => return error.Unseekable,
.NXIO => return error.Unseekable,
else => |err| return posix.unexpectedErrno(err),
}
};
if (native_os == .windows) {
try pool.checkCancel();
return windows.SetFilePointerEx_BEGIN(fd, offset);
}
if (native_os == .wasi and !builtin.link_libc) while (true) {
try pool.checkCancel();
var new_offset: std.os.wasi.filesize_t = undefined;
switch (std.os.wasi.fd_seek(fd, @bitCast(offset), .SET, &new_offset)) {
.SUCCESS => return,
.INTR => continue,
.BADF => |err| return errnoBug(err), // Always a race condition.
.INVAL => return error.Unseekable,
.OVERFLOW => return error.Unseekable,
.SPIPE => return error.Unseekable,
.NXIO => return error.Unseekable,
.NOTCAPABLE => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
};
if (posix.SEEK == void) return error.Unseekable;
while (true) {
try pool.checkCancel();
switch (posix.errno(lseek_sym(fd, @bitCast(offset), posix.SEEK.SET))) {
.SUCCESS => return,
.INTR => continue,
.BADF => |err| return errnoBug(err), // Always a race condition.
.INVAL => return error.Unseekable,
.OVERFLOW => return error.Unseekable,
.SPIPE => return error.Unseekable,
.NXIO => return error.Unseekable,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: posix.off_t) Io.File.PWriteError!usize {
const pool: *Pool = @ptrCast(@alignCast(userdata));
try pool.checkCancel();
const fs_file: std.fs.File = .{ .handle = file.handle };
return switch (offset) {
-1 => fs_file.write(buffer),
else => fs_file.pwrite(buffer, @bitCast(offset)),
};
}
fn nowPosix(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
const pool: *Pool = @ptrCast(@alignCast(userdata));
_ = pool;
const clock_id: posix.clockid_t = clockToPosix(clock);
var tp: posix.timespec = undefined;
switch (posix.errno(posix.system.clock_gettime(clock_id, &tp))) {
.SUCCESS => return timestampFromPosix(&tp),
.INVAL => return error.UnsupportedClock,
else => |err| return posix.unexpectedErrno(err),
}
}
fn nowWindows(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
const pool: *Pool = @ptrCast(@alignCast(userdata));
_ = pool;
switch (clock) {
.realtime => {
// RtlGetSystemTimePrecise() has a granularity of 100 nanoseconds
// and uses the NTFS/Windows epoch, which is 1601-01-01.
return .{ .nanoseconds = @as(i96, windows.ntdll.RtlGetSystemTimePrecise()) * 100 };
},
.monotonic, .uptime => {
// QPC on windows doesn't fail on >= XP/2000 and includes time suspended.
return .{ .timestamp = windows.QueryPerformanceCounter() };
},
.process_cputime_id,
.thread_cputime_id,
=> return error.UnsupportedClock,
}
}
fn nowWasi(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
const pool: *Pool = @ptrCast(@alignCast(userdata));
_ = pool;
var ns: std.os.wasi.timestamp_t = undefined;
const err = std.os.wasi.clock_time_get(clockToWasi(clock), 1, &ns);
if (err != .SUCCESS) return error.Unexpected;
return ns;
}
fn sleepLinux(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
const clock_id: posix.clockid_t = clockToPosix(switch (timeout) {
.none => .awake,
.duration => |d| d.clock,
.deadline => |d| d.clock,
});
const deadline_nanoseconds: i96 = switch (timeout) {
.none => std.math.maxInt(i96),
.duration => |duration| duration.raw.nanoseconds,
.deadline => |deadline| deadline.raw.nanoseconds,
};
var timespec: posix.timespec = timestampToPosix(deadline_nanoseconds);
while (true) {
try pool.checkCancel();
switch (std.os.linux.E.init(std.os.linux.clock_nanosleep(clock_id, .{ .ABSTIME = switch (timeout) {
.none, .duration => false,
.deadline => true,
} }, &timespec, &timespec))) {
.SUCCESS => return,
.INTR => continue,
.INVAL => return error.UnsupportedClock,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn sleepWindows(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
try pool.checkCancel();
const ms = ms: {
const duration_and_clock = (try timeout.toDurationFromNow(pool.io())) orelse
break :ms std.math.maxInt(windows.DWORD);
break :ms std.math.lossyCast(windows.DWORD, duration_and_clock.duration.toMilliseconds());
};
windows.kernel32.Sleep(ms);
}
fn sleepWasi(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
try pool.checkCancel();
const w = std.os.wasi;
const clock: w.subscription_clock_t = if (try timeout.toDurationFromNow(pool.io())) |d| .{
.id = clockToWasi(d.clock),
.timeout = std.math.lossyCast(u64, d.duration.nanoseconds),
.precision = 0,
.flags = 0,
} else .{
.id = .MONOTONIC,
.timeout = std.math.maxInt(u64),
.precision = 0,
.flags = 0,
};
const in: w.subscription_t = .{
.userdata = 0,
.u = .{
.tag = .CLOCK,
.u = .{ .clock = clock },
},
};
var event: w.event_t = undefined;
var nevents: usize = undefined;
_ = w.poll_oneoff(&in, &event, 1, &nevents);
}
fn sleepPosix(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
const sec_type = @typeInfo(posix.timespec).@"struct".fields[0].type;
const nsec_type = @typeInfo(posix.timespec).@"struct".fields[1].type;
var timespec: posix.timespec = t: {
const d = (try timeout.toDurationFromNow(pool.io())) orelse break :t .{
.sec = std.math.maxInt(sec_type),
.nsec = std.math.maxInt(nsec_type),
};
break :t timestampToPosix(d.duration.nanoseconds);
};
while (true) {
try pool.checkCancel();
switch (posix.errno(posix.system.nanosleep(&timespec, &timespec))) {
.INTR => continue,
else => return, // This prong handles success as well as unexpected errors.
}
}
}
fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize {
const pool: *Pool = @ptrCast(@alignCast(userdata));
_ = pool;
var reset_event: ResetEvent = .unset;
for (futures, 0..) |future, i| {
const closure: *AsyncClosure = @ptrCast(@alignCast(future));
if (@atomicRmw(?*ResetEvent, &closure.select_condition, .Xchg, &reset_event, .seq_cst) == AsyncClosure.done_reset_event) {
for (futures[0..i]) |cleanup_future| {
const cleanup_closure: *AsyncClosure = @ptrCast(@alignCast(cleanup_future));
if (@atomicRmw(?*ResetEvent, &cleanup_closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) {
cleanup_closure.reset_event.wait(); // Ensure no reference to our stack-allocated reset_event.
}
}
return i;
}
}
reset_event.wait();
var result: ?usize = null;
for (futures, 0..) |future, i| {
const closure: *AsyncClosure = @ptrCast(@alignCast(future));
if (@atomicRmw(?*ResetEvent, &closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) {
closure.reset_event.wait(); // Ensure no reference to our stack-allocated reset_event.
if (result == null) result = i; // In case multiple are ready, return first.
}
}
return result.?;
}
fn netListenIpPosix(
userdata: ?*anyopaque,
address: IpAddress,
options: IpAddress.ListenOptions,
) IpAddress.ListenError!net.Server {
const pool: *Pool = @ptrCast(@alignCast(userdata));
const family = posixAddressFamily(&address);
const socket_fd = try openSocketPosix(pool, family, .{
.mode = options.mode,
.protocol = options.protocol,
});
errdefer posix.close(socket_fd);
if (options.reuse_address) {
try setSocketOption(pool, socket_fd, posix.SOL.SOCKET, posix.SO.REUSEADDR, 1);
if (@hasDecl(posix.SO, "REUSEPORT"))
try setSocketOption(pool, socket_fd, posix.SOL.SOCKET, posix.SO.REUSEPORT, 1);
}
var storage: PosixAddress = undefined;
var addr_len = addressToPosix(&address, &storage);
try posixBind(pool, socket_fd, &storage.any, addr_len);
while (true) {
try pool.checkCancel();
switch (posix.errno(posix.system.listen(socket_fd, options.kernel_backlog))) {
.SUCCESS => break,
.ADDRINUSE => return error.AddressInUse,
.BADF => |err| return errnoBug(err),
else => |err| return posix.unexpectedErrno(err),
}
}
try posixGetSockName(pool, socket_fd, &storage.any, &addr_len);
return .{
.socket = .{
.handle = socket_fd,
.address = addressFromPosix(&storage),
},
};
}
fn netListenUnix(
userdata: ?*anyopaque,
address: *const net.UnixAddress,
options: net.UnixAddress.ListenOptions,
) net.UnixAddress.ListenError!net.Socket.Handle {
if (!net.has_unix_sockets) return error.AddressFamilyUnsupported;
const pool: *Pool = @ptrCast(@alignCast(userdata));
const socket_fd = openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .stream }) catch |err| switch (err) {
error.ProtocolUnsupportedBySystem => return error.AddressFamilyUnsupported,
error.ProtocolUnsupportedByAddressFamily => return error.AddressFamilyUnsupported,
error.SocketModeUnsupported => return error.AddressFamilyUnsupported,
else => |e| return e,
};
errdefer posix.close(socket_fd);
var storage: UnixAddress = undefined;
const addr_len = addressUnixToPosix(address, &storage);
try posixBindUnix(pool, socket_fd, &storage.any, addr_len);
while (true) {
try pool.checkCancel();
switch (posix.errno(posix.system.listen(socket_fd, options.kernel_backlog))) {
.SUCCESS => break,
.ADDRINUSE => return error.AddressInUse,
.BADF => |err| return errnoBug(err),
else => |err| return posix.unexpectedErrno(err),
}
}
return socket_fd;
}
fn posixBindUnix(pool: *Pool, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
while (true) {
try pool.checkCancel();
switch (posix.errno(posix.system.bind(fd, addr, addr_len))) {
.SUCCESS => break,
.INTR => continue,
.ACCES => return error.AccessDenied,
.ADDRINUSE => return error.AddressInUse,
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
.ADDRNOTAVAIL => return error.AddressUnavailable,
.NOMEM => return error.SystemResources,
.LOOP => return error.SymLinkLoop,
.NOENT => return error.FileNotFound,
.NOTDIR => return error.NotDir,
.ROFS => return error.ReadOnlyFileSystem,
.PERM => return error.PermissionDenied,
.BADF => |err| return errnoBug(err), // always a race condition if this error is returned
.INVAL => |err| return errnoBug(err), // invalid parameters
.NOTSOCK => |err| return errnoBug(err), // invalid `sockfd`
.FAULT => |err| return errnoBug(err), // invalid `addr` pointer
.NAMETOOLONG => |err| return errnoBug(err),
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn posixBind(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
while (true) {
try pool.checkCancel();
switch (posix.errno(posix.system.bind(socket_fd, addr, addr_len))) {
.SUCCESS => break,
.INTR => continue,
.ADDRINUSE => return error.AddressInUse,
.BADF => |err| return errnoBug(err), // always a race condition if this error is returned
.INVAL => |err| return errnoBug(err), // invalid parameters
.NOTSOCK => |err| return errnoBug(err), // invalid `sockfd`
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
.ADDRNOTAVAIL => return error.AddressUnavailable,
.FAULT => |err| return errnoBug(err), // invalid `addr` pointer
.NOMEM => return error.SystemResources,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
while (true) {
try pool.checkCancel();
switch (posix.errno(posix.system.connect(socket_fd, addr, addr_len))) {
.SUCCESS => return,
.INTR => continue,
.ADDRNOTAVAIL => return error.AddressUnavailable,
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
.AGAIN, .INPROGRESS => return error.WouldBlock,
.ALREADY => return error.ConnectionPending,
.BADF => |err| return errnoBug(err),
.CONNREFUSED => return error.ConnectionRefused,
.CONNRESET => return error.ConnectionResetByPeer,
.FAULT => |err| return errnoBug(err),
.ISCONN => |err| return errnoBug(err),
.HOSTUNREACH => return error.HostUnreachable,
.NETUNREACH => return error.NetworkUnreachable,
.NOTSOCK => |err| return errnoBug(err),
.PROTOTYPE => |err| return errnoBug(err),
.TIMEDOUT => return error.Timeout,
.CONNABORTED => |err| return errnoBug(err),
.ACCES => return error.AccessDenied,
.PERM => |err| return errnoBug(err),
.NOENT => |err| return errnoBug(err),
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn posixConnectUnix(pool: *Pool, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
while (true) {
try pool.checkCancel();
switch (posix.errno(posix.system.connect(fd, addr, addr_len))) {
.SUCCESS => return,
.INTR => continue,
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
.AGAIN => return error.WouldBlock,
.INPROGRESS => return error.WouldBlock,
.ACCES => return error.AccessDenied,
.LOOP => return error.SymLinkLoop,
.NOENT => return error.FileNotFound,
.NOTDIR => return error.NotDir,
.ROFS => return error.ReadOnlyFileSystem,
.PERM => return error.PermissionDenied,
.BADF => |err| return errnoBug(err),
.CONNABORTED => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.ISCONN => |err| return errnoBug(err),
.NOTSOCK => |err| return errnoBug(err),
.PROTOTYPE => |err| return errnoBug(err),
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn posixGetSockName(pool: *Pool, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void {
while (true) {
try pool.checkCancel();
switch (posix.errno(posix.system.getsockname(socket_fd, addr, addr_len))) {
.SUCCESS => break,
.INTR => continue,
.BADF => |err| return errnoBug(err), // always a race condition
.FAULT => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err), // invalid parameters
.NOTSOCK => |err| return errnoBug(err), // always a race condition
.NOBUFS => return error.SystemResources,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn setSocketOption(pool: *Pool, fd: posix.fd_t, level: i32, opt_name: u32, option: u32) !void {
const o: []const u8 = @ptrCast(&option);
while (true) {
try pool.checkCancel();
switch (posix.errno(posix.system.setsockopt(fd, level, opt_name, o.ptr, @intCast(o.len)))) {
.SUCCESS => return,
.INTR => continue,
.BADF => |err| return errnoBug(err), // always a race condition
.NOTSOCK => |err| return errnoBug(err), // always a race condition
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn netConnectIpPosix(
userdata: ?*anyopaque,
address: *const IpAddress,
options: IpAddress.ConnectOptions,
) IpAddress.ConnectError!net.Stream {
if (options.timeout != .none) @panic("TODO");
const pool: *Pool = @ptrCast(@alignCast(userdata));
const family = posixAddressFamily(address);
const socket_fd = try openSocketPosix(pool, family, .{
.mode = options.mode,
.protocol = options.protocol,
});
errdefer posix.close(socket_fd);
var storage: PosixAddress = undefined;
var addr_len = addressToPosix(address, &storage);
try posixConnect(pool, socket_fd, &storage.any, addr_len);
try posixGetSockName(pool, socket_fd, &storage.any, &addr_len);
return .{ .socket = .{
.handle = socket_fd,
.address = addressFromPosix(&storage),
} };
}
fn netConnectUnix(
userdata: ?*anyopaque,
address: *const net.UnixAddress,
) net.UnixAddress.ConnectError!net.Socket.Handle {
if (!net.has_unix_sockets) return error.AddressFamilyUnsupported;
const pool: *Pool = @ptrCast(@alignCast(userdata));
const socket_fd = try openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .stream });
errdefer posix.close(socket_fd);
var storage: UnixAddress = undefined;
const addr_len = addressUnixToPosix(address, &storage);
try posixConnectUnix(pool, socket_fd, &storage.any, addr_len);
return socket_fd;
}
fn netBindIpPosix(
userdata: ?*anyopaque,
address: *const IpAddress,
options: IpAddress.BindOptions,
) IpAddress.BindError!net.Socket {
const pool: *Pool = @ptrCast(@alignCast(userdata));
const family = posixAddressFamily(address);
const socket_fd = try openSocketPosix(pool, family, options);
errdefer posix.close(socket_fd);
var storage: PosixAddress = undefined;
var addr_len = addressToPosix(address, &storage);
try posixBind(pool, socket_fd, &storage.any, addr_len);
try posixGetSockName(pool, socket_fd, &storage.any, &addr_len);
return .{
.handle = socket_fd,
.address = addressFromPosix(&storage),
};
}
fn openSocketPosix(pool: *Pool, family: posix.sa_family_t, options: IpAddress.BindOptions) !posix.socket_t {
const mode = posixSocketMode(options.mode);
const protocol = posixProtocol(options.protocol);
const socket_fd = while (true) {
try pool.checkCancel();
const flags: u32 = mode | if (socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC;
const socket_rc = posix.system.socket(family, flags, protocol);
switch (posix.errno(socket_rc)) {
.SUCCESS => {
const fd: posix.fd_t = @intCast(socket_rc);
errdefer posix.close(fd);
if (socket_flags_unsupported) while (true) {
try pool.checkCancel();
switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
.SUCCESS => break,
.INTR => continue,
else => |err| return posix.unexpectedErrno(err),
}
};
break fd;
},
.INTR => continue,
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
.INVAL => return error.ProtocolUnsupportedBySystem,
.MFILE => return error.ProcessFdQuotaExceeded,
.NFILE => return error.SystemFdQuotaExceeded,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.PROTONOSUPPORT => return error.ProtocolUnsupportedByAddressFamily,
.PROTOTYPE => return error.SocketModeUnsupported,
else => |err| return posix.unexpectedErrno(err),
}
};
errdefer posix.close(socket_fd);
if (options.ip6_only) {
if (posix.IPV6 == void) return error.OptionUnsupported;
try setSocketOption(pool, socket_fd, posix.IPPROTO.IPV6, posix.IPV6.V6ONLY, 0);
}
return socket_fd;
}
const socket_flags_unsupported = builtin.os.tag.isDarwin() or native_os == .haiku; // 💩💩
const have_accept4 = !socket_flags_unsupported;
fn netAcceptPosix(userdata: ?*anyopaque, listen_fd: net.Socket.Handle) net.Server.AcceptError!net.Stream {
const pool: *Pool = @ptrCast(@alignCast(userdata));
var storage: PosixAddress = undefined;
var addr_len: posix.socklen_t = @sizeOf(PosixAddress);
const fd = while (true) {
try pool.checkCancel();
const rc = if (have_accept4)
posix.system.accept4(listen_fd, &storage.any, &addr_len, posix.SOCK.CLOEXEC)
else
posix.system.accept(listen_fd, &storage.any, &addr_len);
switch (posix.errno(rc)) {
.SUCCESS => {
const fd: posix.fd_t = @intCast(rc);
errdefer posix.close(fd);
if (!have_accept4) while (true) {
try pool.checkCancel();
switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
.SUCCESS => break,
.INTR => continue,
else => |err| return posix.unexpectedErrno(err),
}
};
break fd;
},
.INTR => continue,
.AGAIN => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err), // always a race condition
.CONNABORTED => return error.ConnectionAborted,
.FAULT => |err| return errnoBug(err),
.INVAL => return error.SocketNotListening,
.NOTSOCK => |err| return errnoBug(err),
.MFILE => return error.ProcessFdQuotaExceeded,
.NFILE => return error.SystemFdQuotaExceeded,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.OPNOTSUPP => |err| return errnoBug(err),
.PROTO => return error.ProtocolFailure,
.PERM => return error.BlockedByFirewall,
else => |err| return posix.unexpectedErrno(err),
}
};
return .{ .socket = .{
.handle = fd,
.address = addressFromPosix(&storage),
} };
}
fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
const pool: *Pool = @ptrCast(@alignCast(userdata));
var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
var i: usize = 0;
for (data) |buf| {
if (iovecs_buffer.len - i == 0) break;
if (buf.len != 0) {
iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len };
i += 1;
}
}
const dest = iovecs_buffer[0..i];
assert(dest[0].len > 0);
if (native_os == .wasi and !builtin.link_libc) while (true) {
try pool.checkCancel();
var n: usize = undefined;
switch (std.os.wasi.fd_read(fd, dest.ptr, dest.len, &n)) {
.SUCCESS => return n,
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.AGAIN => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err),
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.Timeout,
.NOTCAPABLE => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
};
while (true) {
try pool.checkCancel();
const rc = posix.system.readv(fd, dest.ptr, @intCast(dest.len));
switch (posix.errno(rc)) {
.SUCCESS => return @intCast(rc),
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.AGAIN => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err), // Always a race condition.
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.Timeout,
.PIPE => return error.BrokenPipe,
.NETDOWN => return error.NetworkDown,
else => |err| return posix.unexpectedErrno(err),
}
}
}
const have_sendmmsg = builtin.os.tag == .linux;
fn netSend(
userdata: ?*anyopaque,
handle: net.Socket.Handle,
messages: []net.OutgoingMessage,
flags: net.SendFlags,
) struct { ?net.Socket.SendError, usize } {
const pool: *Pool = @ptrCast(@alignCast(userdata));
const posix_flags: u32 =
@as(u32, if (@hasDecl(posix.MSG, "CONFIRM") and flags.confirm) posix.MSG.CONFIRM else 0) |
@as(u32, if (flags.dont_route) posix.MSG.DONTROUTE else 0) |
@as(u32, if (flags.eor) posix.MSG.EOR else 0) |
@as(u32, if (flags.oob) posix.MSG.OOB else 0) |
@as(u32, if (@hasDecl(posix.MSG, "FASTOPEN") and flags.fastopen) posix.MSG.FASTOPEN else 0) |
posix.MSG.NOSIGNAL;
var i: usize = 0;
while (messages.len - i != 0) {
if (have_sendmmsg) {
i += netSendMany(pool, handle, messages[i..], posix_flags) catch |err| return .{ err, i };
continue;
}
netSendOne(pool, handle, &messages[i], posix_flags) catch |err| return .{ err, i };
i += 1;
}
return .{ null, i };
}
fn netSendOne(
pool: *Pool,
handle: net.Socket.Handle,
message: *net.OutgoingMessage,
flags: u32,
) net.Socket.SendError!void {
var addr: PosixAddress = undefined;
var iovec: posix.iovec = .{ .base = @constCast(message.data_ptr), .len = message.data_len };
const msg: posix.msghdr = .{
.name = &addr.any,
.namelen = addressToPosix(message.address, &addr),
.iov = iovec[0..1],
.iovlen = 1,
.control = @constCast(message.control.ptr),
.controllen = message.control.len,
.flags = 0,
};
while (true) {
try pool.checkCancel();
const rc = posix.system.sendmsg(handle, msg, flags);
if (is_windows) {
if (rc == windows.ws2_32.SOCKET_ERROR) {
switch (windows.ws2_32.WSAGetLastError()) {
.WSAEACCES => return error.AccessDenied,
.WSAEADDRNOTAVAIL => return error.AddressNotAvailable,
.WSAECONNRESET => return error.ConnectionResetByPeer,
.WSAEMSGSIZE => return error.MessageTooBig,
.WSAENOBUFS => return error.SystemResources,
.WSAENOTSOCK => return error.FileDescriptorNotASocket,
.WSAEAFNOSUPPORT => return error.AddressFamilyNotSupported,
.WSAEDESTADDRREQ => unreachable, // A destination address is required.
.WSAEFAULT => unreachable, // The lpBuffers, lpTo, lpOverlapped, lpNumberOfBytesSent, or lpCompletionRoutine parameters are not part of the user address space, or the lpTo parameter is too small.
.WSAEHOSTUNREACH => return error.NetworkUnreachable,
// TODO: WSAEINPROGRESS, WSAEINTR
.WSAEINVAL => unreachable,
.WSAENETDOWN => return error.NetworkDown,
.WSAENETRESET => return error.ConnectionResetByPeer,
.WSAENETUNREACH => return error.NetworkUnreachable,
.WSAENOTCONN => return error.SocketUnconnected,
.WSAESHUTDOWN => unreachable, // The socket has been shut down; it is not possible to WSASendTo on a socket after shutdown has been invoked with how set to SD_SEND or SD_BOTH.
.WSAEWOULDBLOCK => return error.WouldBlock,
.WSANOTINITIALISED => unreachable, // A successful WSAStartup call must occur before using this function.
else => |err| return windows.unexpectedWSAError(err),
}
} else {
message.data_len = @intCast(rc);
return;
}
}
switch (posix.errno(rc)) {
.SUCCESS => {
message.data_len = @intCast(rc);
return;
},
.ACCES => return error.AccessDenied,
.AGAIN => return error.WouldBlock,
.ALREADY => return error.FastOpenAlreadyInProgress,
.BADF => |err| return errnoBug(err),
.CONNRESET => return error.ConnectionResetByPeer,
.DESTADDRREQ => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.ISCONN => |err| return errnoBug(err),
.MSGSIZE => return error.MessageTooBig,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTSOCK => |err| return errnoBug(err),
.OPNOTSUPP => |err| return errnoBug(err),
.PIPE => return error.BrokenPipe,
.AFNOSUPPORT => return error.AddressFamilyNotSupported,
.LOOP => return error.SymLinkLoop,
.NAMETOOLONG => return error.NameTooLong,
.NOENT => return error.FileNotFound,
.NOTDIR => return error.NotDir,
.HOSTUNREACH => return error.NetworkUnreachable,
.NETUNREACH => return error.NetworkUnreachable,
.NOTCONN => return error.SocketUnconnected,
.NETDOWN => return error.NetworkDown,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn netSendMany(
pool: *Pool,
handle: net.Socket.Handle,
messages: []net.OutgoingMessage,
flags: u32,
) net.Socket.SendError!usize {
var msg_buffer: [64]std.os.linux.mmsghdr = undefined;
var addr_buffer: [msg_buffer.len]PosixAddress = undefined;
var iovecs_buffer: [msg_buffer.len]posix.iovec = undefined;
const min_len: usize = @min(messages.len, msg_buffer.len);
const clamped_messages = messages[0..min_len];
const clamped_msgs = (&msg_buffer)[0..min_len];
const clamped_addrs = (&addr_buffer)[0..min_len];
const clamped_iovecs = (&iovecs_buffer)[0..min_len];
for (clamped_messages, clamped_msgs, clamped_addrs, clamped_iovecs) |*message, *msg, *addr, *iovec| {
iovec.* = .{ .base = @constCast(message.data_ptr), .len = message.data_len };
msg.* = .{
.hdr = .{
.name = &addr.any,
.namelen = addressToPosix(message.address, addr),
.iov = iovec[0..1],
.iovlen = 1,
.control = @constCast(message.control.ptr),
.controllen = message.control.len,
.flags = 0,
},
.len = undefined, // Populated by calling sendmmsg below.
};
}
while (true) {
try pool.checkCancel();
const rc = posix.system.sendmmsg(handle, clamped_msgs.ptr, @intCast(clamped_msgs.len), flags);
switch (posix.errno(rc)) {
.SUCCESS => {
const n: usize = @intCast(rc);
for (clamped_messages[0..n], clamped_msgs[0..n]) |*message, *msg| {
message.data_len = msg.len;
}
return n;
},
.AGAIN => |err| return errnoBug(err),
.ALREADY => return error.FastOpenAlreadyInProgress,
.BADF => |err| return errnoBug(err), // Always a race condition.
.CONNRESET => return error.ConnectionResetByPeer,
.DESTADDRREQ => |err| return errnoBug(err), // The socket is not connection-mode, and no peer address is set.
.FAULT => |err| return errnoBug(err), // An invalid user space address was specified for an argument.
.INTR => continue,
.INVAL => |err| return errnoBug(err), // Invalid argument passed.
.ISCONN => |err| return errnoBug(err), // connection-mode socket was connected already but a recipient was specified
.MSGSIZE => return error.MessageOversize,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTSOCK => |err| return errnoBug(err), // The file descriptor sockfd does not refer to a socket.
.OPNOTSUPP => |err| return errnoBug(err), // Some bit in the flags argument is inappropriate for the socket type.
.PIPE => return error.SocketUnconnected,
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
.HOSTUNREACH => return error.NetworkUnreachable,
.NETUNREACH => return error.NetworkUnreachable,
.NOTCONN => return error.SocketUnconnected,
.NETDOWN => return error.NetworkDown,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn netReceive(
userdata: ?*anyopaque,
handle: net.Socket.Handle,
message_buffer: []net.IncomingMessage,
data_buffer: []u8,
flags: net.ReceiveFlags,
timeout: Io.Timeout,
) struct { ?net.Socket.ReceiveTimeoutError, usize } {
const pool: *Pool = @ptrCast(@alignCast(userdata));
// recvmmsg is useless, here's why:
// * [timeout bug](https://bugzilla.kernel.org/show_bug.cgi?id=75371)
// * it wants iovecs for each message but we have a better API: one data
// buffer to handle all the messages. The better API cannot be lowered to
// the split vectors though because reducing the buffer size might make
// some messages unreceivable.
// So the strategy instead is to use non-blocking recvmsg calls, calling
// poll() with timeout if the first one returns EAGAIN.
const posix_flags: u32 =
@as(u32, if (flags.oob) posix.MSG.OOB else 0) |
@as(u32, if (flags.peek) posix.MSG.PEEK else 0) |
@as(u32, if (flags.trunc) posix.MSG.TRUNC else 0) |
posix.MSG.DONTWAIT | posix.MSG.NOSIGNAL;
var poll_fds: [1]posix.pollfd = .{
.{
.fd = handle,
.events = posix.POLL.IN,
.revents = undefined,
},
};
var message_i: usize = 0;
var data_i: usize = 0;
const deadline = timeout.toDeadline(pool.io()) catch |err| return .{ err, message_i };
recv: while (true) {
pool.checkCancel() catch |err| return .{ err, message_i };
if (message_buffer.len - message_i == 0) return .{ null, message_i };
const message = &message_buffer[message_i];
const remaining_data_buffer = data_buffer[data_i..];
var storage: PosixAddress = undefined;
var iov: posix.iovec = .{ .base = remaining_data_buffer.ptr, .len = remaining_data_buffer.len };
var msg: posix.msghdr = .{
.name = &storage.any,
.namelen = @sizeOf(PosixAddress),
.iov = (&iov)[0..1],
.iovlen = 1,
.control = message.control.ptr,
.controllen = message.control.len,
.flags = undefined,
};
const recv_rc = posix.system.recvmsg(handle, &msg, posix_flags);
switch (posix.errno(recv_rc)) {
.SUCCESS => {
const data = remaining_data_buffer[0..@intCast(recv_rc)];
data_i += data.len;
message.* = .{
.from = addressFromPosix(&storage),
.data = data,
.control = if (msg.control) |ptr| @as([*]u8, @ptrCast(ptr))[0..msg.controllen] else message.control,
.flags = .{
.eor = (msg.flags & posix.MSG.EOR) != 0,
.trunc = (msg.flags & posix.MSG.TRUNC) != 0,
.ctrunc = (msg.flags & posix.MSG.CTRUNC) != 0,
.oob = (msg.flags & posix.MSG.OOB) != 0,
.errqueue = (msg.flags & posix.MSG.ERRQUEUE) != 0,
},
};
message_i += 1;
continue;
},
.AGAIN => while (true) {
pool.checkCancel() catch |err| return .{ err, message_i };
if (message_i != 0) return .{ null, message_i };
const max_poll_ms = std.math.maxInt(u31);
const timeout_ms: u31 = if (deadline) |d| t: {
const duration = d.durationFromNow(pool.io()) catch |err| return .{ err, message_i };
if (duration.raw.nanoseconds <= 0) return .{ error.Timeout, message_i };
break :t @intCast(@min(max_poll_ms, duration.raw.toMilliseconds()));
} else max_poll_ms;
const poll_rc = posix.system.poll(&poll_fds, poll_fds.len, timeout_ms);
switch (posix.errno(poll_rc)) {
.SUCCESS => {
if (poll_rc == 0) {
// Although spurious timeouts are OK, when no deadline
// is passed we must not return `error.Timeout`.
if (deadline == null) continue;
return .{ error.Timeout, message_i };
}
continue :recv;
},
.INTR => continue,
.FAULT => |err| return .{ errnoBug(err), message_i },
.INVAL => |err| return .{ errnoBug(err), message_i },
.NOMEM => return .{ error.SystemResources, message_i },
else => |err| return .{ posix.unexpectedErrno(err), message_i },
}
},
.INTR => continue,
.BADF => |err| return .{ errnoBug(err), message_i },
.NFILE => return .{ error.SystemFdQuotaExceeded, message_i },
.MFILE => return .{ error.ProcessFdQuotaExceeded, message_i },
.FAULT => |err| return .{ errnoBug(err), message_i },
.INVAL => |err| return .{ errnoBug(err), message_i },
.NOBUFS => return .{ error.SystemResources, message_i },
.NOMEM => return .{ error.SystemResources, message_i },
.NOTCONN => return .{ error.SocketUnconnected, message_i },
.NOTSOCK => |err| return .{ errnoBug(err), message_i },
.MSGSIZE => return .{ error.MessageOversize, message_i },
.PIPE => return .{ error.SocketUnconnected, message_i },
.OPNOTSUPP => |err| return .{ errnoBug(err), message_i },
.CONNRESET => return .{ error.ConnectionResetByPeer, message_i },
.NETDOWN => return .{ error.NetworkDown, message_i },
else => |err| return .{ posix.unexpectedErrno(err), message_i },
}
}
}
fn netWritePosix(
userdata: ?*anyopaque,
fd: net.Socket.Handle,
header: []const u8,
data: []const []const u8,
splat: usize,
) net.Stream.Writer.Error!usize {
const pool: *Pool = @ptrCast(@alignCast(userdata));
try pool.checkCancel();
var iovecs: [max_iovecs_len]posix.iovec_const = undefined;
var msg: posix.msghdr_const = .{
.name = null,
.namelen = 0,
.iov = &iovecs,
.iovlen = 0,
.control = null,
.controllen = 0,
.flags = 0,
};
addBuf(&iovecs, &msg.iovlen, header);
for (data[0 .. data.len - 1]) |bytes| addBuf(&iovecs, &msg.iovlen, bytes);
const pattern = data[data.len - 1];
if (iovecs.len - msg.iovlen != 0) switch (splat) {
0 => {},
1 => addBuf(&iovecs, &msg.iovlen, pattern),
else => switch (pattern.len) {
0 => {},
1 => {
var backup_buffer: [splat_buffer_size]u8 = undefined;
const splat_buffer = &backup_buffer;
const memset_len = @min(splat_buffer.len, splat);
const buf = splat_buffer[0..memset_len];
@memset(buf, pattern[0]);
addBuf(&iovecs, &msg.iovlen, buf);
var remaining_splat = splat - buf.len;
while (remaining_splat > splat_buffer.len and iovecs.len - msg.iovlen != 0) {
assert(buf.len == splat_buffer.len);
addBuf(&iovecs, &msg.iovlen, splat_buffer);
remaining_splat -= splat_buffer.len;
}
addBuf(&iovecs, &msg.iovlen, splat_buffer[0..remaining_splat]);
},
else => for (0..@min(splat, iovecs.len - msg.iovlen)) |_| {
addBuf(&iovecs, &msg.iovlen, pattern);
},
},
};
const flags = posix.MSG.NOSIGNAL;
return posix.sendmsg(fd, &msg, flags);
}
fn addBuf(v: []posix.iovec_const, i: *@FieldType(posix.msghdr_const, "iovlen"), bytes: []const u8) void {
// OS checks ptr addr before length so zero length vectors must be omitted.
if (bytes.len == 0) return;
if (v.len - i.* == 0) return;
v[i.*] = .{ .base = bytes.ptr, .len = bytes.len };
i.* += 1;
}
fn netClose(userdata: ?*anyopaque, handle: net.Socket.Handle) void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
_ = pool;
switch (native_os) {
.windows => windows.closesocket(handle) catch recoverableOsBugDetected(),
else => posix.close(handle),
}
}
fn netInterfaceNameResolve(
userdata: ?*anyopaque,
name: *const net.Interface.Name,
) net.Interface.Name.ResolveError!net.Interface {
const pool: *Pool = @ptrCast(@alignCast(userdata));
if (native_os == .linux) {
const sock_fd = openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .dgram }) catch |err| switch (err) {
error.ProcessFdQuotaExceeded => return error.SystemResources,
error.SystemFdQuotaExceeded => return error.SystemResources,
error.AddressFamilyUnsupported => return error.Unexpected,
error.ProtocolUnsupportedBySystem => return error.Unexpected,
error.ProtocolUnsupportedByAddressFamily => return error.Unexpected,
error.SocketModeUnsupported => return error.Unexpected,
else => |e| return e,
};
defer posix.close(sock_fd);
var ifr: posix.ifreq = .{
.ifrn = .{ .name = @bitCast(name.bytes) },
.ifru = undefined,
};
while (true) {
try pool.checkCancel();
switch (posix.errno(posix.system.ioctl(sock_fd, posix.SIOCGIFINDEX, @intFromPtr(&ifr)))) {
.SUCCESS => return .{ .index = @bitCast(ifr.ifru.ivalue) },
.INTR => continue,
.INVAL => |err| return errnoBug(err), // Bad parameters.
.NOTTY => |err| return errnoBug(err),
.NXIO => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err), // Always a race condition.
.FAULT => |err| return errnoBug(err), // Bad pointer parameter.
.IO => |err| return errnoBug(err), // sock_fd is not a file descriptor
.NODEV => return error.InterfaceNotFound,
else => |err| return posix.unexpectedErrno(err),
}
}
}
if (native_os == .windows) {
try pool.checkCancel();
const index = std.os.windows.ws2_32.if_nametoindex(&name.bytes);
if (index == 0) return error.InterfaceNotFound;
return .{ .index = index };
}
if (builtin.link_libc) {
try pool.checkCancel();
const index = std.c.if_nametoindex(&name.bytes);
if (index == 0) return error.InterfaceNotFound;
return .{ .index = @bitCast(index) };
}
@panic("unimplemented");
}
fn netInterfaceName(userdata: ?*anyopaque, interface: net.Interface) net.Interface.NameError!net.Interface.Name {
const pool: *Pool = @ptrCast(@alignCast(userdata));
try pool.checkCancel();
if (native_os == .linux) {
_ = interface;
@panic("TODO");
}
if (native_os == .windows) {
@panic("TODO");
}
if (builtin.link_libc) {
@panic("TODO");
}
@panic("unimplemented");
}
fn netLookup(
userdata: ?*anyopaque,
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
) void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
const pool_io = pool.io();
resolved.putOneUncancelable(pool_io, .{ .end = netLookupFallible(pool, host_name, resolved, options) });
}
fn netLookupFallible(
pool: *Pool,
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
) !void {
const pool_io = pool.io();
const name = host_name.bytes;
assert(name.len <= HostName.max_len);
if (is_windows) {
// TODO use GetAddrInfoExW / GetAddrInfoExCancel
@compileError("TODO");
}
// On Linux, glibc provides getaddrinfo_a which is capable of supporting our semantics.
// However, musl's POSIX-compliant getaddrinfo is not, so we bypass it.
if (builtin.target.isGnuLibC()) {
// TODO use getaddrinfo_a / gai_cancel
}
if (native_os == .linux) {
if (options.family != .ip4) {
if (IpAddress.parseIp6(name, options.port)) |addr| {
try resolved.putAll(pool_io, &.{
.{ .address = addr },
.{ .canonical_name = copyCanon(options.canonical_name_buffer, name) },
});
return;
} else |_| {}
}
if (options.family != .ip6) {
if (IpAddress.parseIp4(name, options.port)) |addr| {
try resolved.putAll(pool_io, &.{
.{ .address = addr },
.{ .canonical_name = copyCanon(options.canonical_name_buffer, name) },
});
} else |_| {}
}
lookupHosts(pool, host_name, resolved, options) catch |err| switch (err) {
error.UnknownHostName => {},
else => |e| return e,
};
// RFC 6761 Section 6.3.3
// Name resolution APIs and libraries SHOULD recognize
// localhost names as special and SHOULD always return the IP
// loopback address for address queries and negative responses
// for all other query types.
// Check for equal to "localhost(.)" or ends in ".localhost(.)"
const localhost = if (name[name.len - 1] == '.') "localhost." else "localhost";
if (std.mem.endsWith(u8, name, localhost) and
(name.len == localhost.len or name[name.len - localhost.len] == '.'))
{
var results_buffer: [3]HostName.LookupResult = undefined;
var results_index: usize = 0;
if (options.family != .ip4) {
results_buffer[results_index] = .{ .address = .{ .ip6 = .loopback(options.port) } };
results_index += 1;
}
if (options.family != .ip6) {
results_buffer[results_index] = .{ .address = .{ .ip4 = .loopback(options.port) } };
results_index += 1;
}
const canon_name = "localhost";
const canon_name_dest = options.canonical_name_buffer[0..canon_name.len];
canon_name_dest.* = canon_name.*;
results_buffer[results_index] = .{ .canonical_name = .{ .bytes = canon_name_dest } };
results_index += 1;
try resolved.putAll(pool_io, results_buffer[0..results_index]);
return;
}
return lookupDnsSearch(pool, host_name, resolved, options);
}
if (native_os == .openbsd) {
// TODO use getaddrinfo_async / asr_abort
}
if (native_os == .freebsd) {
// TODO use dnsres_getaddrinfo
}
if (native_os.isDarwin()) {
// TODO use CFHostStartInfoResolution / CFHostCancelInfoResolution
}
if (builtin.link_libc) {
// This operating system lacks a way to resolve asynchronously. We are
// stuck with getaddrinfo.
@compileError("TODO");
}
return error.OptionUnsupported;
}
const PosixAddress = extern union {
any: posix.sockaddr,
in: posix.sockaddr.in,
in6: posix.sockaddr.in6,
};
const UnixAddress = extern union {
any: posix.sockaddr,
un: posix.sockaddr.un,
};
fn posixAddressFamily(a: *const IpAddress) posix.sa_family_t {
return switch (a.*) {
.ip4 => posix.AF.INET,
.ip6 => posix.AF.INET6,
};
}
fn addressFromPosix(posix_address: *PosixAddress) IpAddress {
return switch (posix_address.any.family) {
posix.AF.INET => .{ .ip4 = address4FromPosix(&posix_address.in) },
posix.AF.INET6 => .{ .ip6 = address6FromPosix(&posix_address.in6) },
else => .{ .ip4 = .loopback(0) },
};
}
fn addressToPosix(a: *const IpAddress, storage: *PosixAddress) posix.socklen_t {
return switch (a.*) {
.ip4 => |ip4| {
storage.in = address4ToPosix(ip4);
return @sizeOf(posix.sockaddr.in);
},
.ip6 => |*ip6| {
storage.in6 = address6ToPosix(ip6);
return @sizeOf(posix.sockaddr.in6);
},
};
}
fn addressUnixToPosix(a: *const net.UnixAddress, storage: *UnixAddress) posix.socklen_t {
@memcpy(storage.un.path[0..a.path.len], a.path);
storage.un.family = posix.AF.UNIX;
storage.un.path[a.path.len] = 0;
return @sizeOf(posix.sockaddr.un);
}
fn address4FromPosix(in: *posix.sockaddr.in) net.Ip4Address {
return .{
.port = std.mem.bigToNative(u16, in.port),
.bytes = @bitCast(in.addr),
};
}
fn address6FromPosix(in6: *posix.sockaddr.in6) net.Ip6Address {
return .{
.port = std.mem.bigToNative(u16, in6.port),
.bytes = in6.addr,
.flow = in6.flowinfo,
.interface = .{ .index = in6.scope_id },
};
}
fn address4ToPosix(a: net.Ip4Address) posix.sockaddr.in {
return .{
.port = std.mem.nativeToBig(u16, a.port),
.addr = @bitCast(a.bytes),
};
}
fn address6ToPosix(a: *const net.Ip6Address) posix.sockaddr.in6 {
return .{
.port = std.mem.nativeToBig(u16, a.port),
.flowinfo = a.flow,
.addr = a.bytes,
.scope_id = a.interface.index,
};
}
fn errnoBug(err: posix.E) Io.UnexpectedError {
switch (builtin.mode) {
.Debug => std.debug.panic("programmer bug caused syscall error: {t}", .{err}),
else => return error.Unexpected,
}
}
fn posixSocketMode(mode: net.Socket.Mode) u32 {
return switch (mode) {
.stream => posix.SOCK.STREAM,
.dgram => posix.SOCK.DGRAM,
.seqpacket => posix.SOCK.SEQPACKET,
.raw => posix.SOCK.RAW,
.rdm => posix.SOCK.RDM,
};
}
fn posixProtocol(protocol: ?net.Protocol) u32 {
return @intFromEnum(protocol orelse return 0);
}
fn recoverableOsBugDetected() void {
if (builtin.mode == .Debug) unreachable;
}
fn clockToPosix(clock: Io.Clock) posix.clockid_t {
return switch (clock) {
.real => posix.CLOCK.REALTIME,
.awake => switch (builtin.os.tag) {
.macos, .ios, .watchos, .tvos => posix.CLOCK.UPTIME_RAW,
else => posix.CLOCK.MONOTONIC,
},
.boot => switch (builtin.os.tag) {
.macos, .ios, .watchos, .tvos => posix.CLOCK.MONOTONIC_RAW,
else => posix.CLOCK.BOOTTIME,
},
.cpu_process => posix.CLOCK.PROCESS_CPUTIME_ID,
.cpu_thread => posix.CLOCK.THREAD_CPUTIME_ID,
};
}
fn clockToWasi(clock: Io.Clock) std.os.wasi.clockid_t {
return switch (clock) {
.realtime => .REALTIME,
.awake => .MONOTONIC,
.boot => .MONOTONIC,
.cpu_process => .PROCESS_CPUTIME_ID,
.cpu_thread => .THREAD_CPUTIME_ID,
};
}
fn statFromLinux(stx: *const std.os.linux.Statx) Io.File.Stat {
const atime = stx.atime;
const mtime = stx.mtime;
const ctime = stx.ctime;
return .{
.inode = stx.ino,
.size = stx.size,
.mode = stx.mode,
.kind = switch (stx.mode & std.os.linux.S.IFMT) {
std.os.linux.S.IFDIR => .directory,
std.os.linux.S.IFCHR => .character_device,
std.os.linux.S.IFBLK => .block_device,
std.os.linux.S.IFREG => .file,
std.os.linux.S.IFIFO => .named_pipe,
std.os.linux.S.IFLNK => .sym_link,
std.os.linux.S.IFSOCK => .unix_domain_socket,
else => .unknown,
},
.atime = .{ .nanoseconds = @intCast(@as(i128, atime.sec) * std.time.ns_per_s + atime.nsec) },
.mtime = .{ .nanoseconds = @intCast(@as(i128, mtime.sec) * std.time.ns_per_s + mtime.nsec) },
.ctime = .{ .nanoseconds = @intCast(@as(i128, ctime.sec) * std.time.ns_per_s + ctime.nsec) },
};
}
fn statFromPosix(st: *const std.posix.Stat) Io.File.Stat {
const atime = st.atime();
const mtime = st.mtime();
const ctime = st.ctime();
return .{
.inode = st.ino,
.size = @bitCast(st.size),
.mode = st.mode,
.kind = k: {
const m = st.mode & std.posix.S.IFMT;
switch (m) {
std.posix.S.IFBLK => break :k .block_device,
std.posix.S.IFCHR => break :k .character_device,
std.posix.S.IFDIR => break :k .directory,
std.posix.S.IFIFO => break :k .named_pipe,
std.posix.S.IFLNK => break :k .sym_link,
std.posix.S.IFREG => break :k .file,
std.posix.S.IFSOCK => break :k .unix_domain_socket,
else => {},
}
if (builtin.os.tag == .illumos) switch (m) {
std.posix.S.IFDOOR => break :k .door,
std.posix.S.IFPORT => break :k .event_port,
else => {},
};
break :k .unknown;
},
.atime = timestampFromPosix(&atime),
.mtime = timestampFromPosix(&mtime),
.ctime = timestampFromPosix(&ctime),
};
}
fn statFromWasi(st: *const std.os.wasi.filestat_t) Io.File.Stat {
return .{
.inode = st.ino,
.size = @bitCast(st.size),
.mode = 0,
.kind = switch (st.filetype) {
.BLOCK_DEVICE => .block_device,
.CHARACTER_DEVICE => .character_device,
.DIRECTORY => .directory,
.SYMBOLIC_LINK => .sym_link,
.REGULAR_FILE => .file,
.SOCKET_STREAM, .SOCKET_DGRAM => .unix_domain_socket,
else => .unknown,
},
.atime = st.atim,
.mtime = st.mtim,
.ctime = st.ctim,
};
}
fn timestampFromPosix(timespec: *const std.posix.timespec) Io.Timestamp {
return .{ .nanoseconds = @intCast(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec) };
}
fn timestampToPosix(nanoseconds: i96) std.posix.timespec {
return .{
.sec = @intCast(@divFloor(nanoseconds, std.time.ns_per_s)),
.nsec = @intCast(@mod(nanoseconds, std.time.ns_per_s)),
};
}
fn pathToPosix(file_path: []const u8, buffer: *[posix.PATH_MAX]u8) Io.Dir.PathNameError![:0]u8 {
if (std.mem.containsAtLeastScalar2(u8, file_path, 0, 1)) return error.BadPathName;
// >= rather than > to make room for the null byte
if (file_path.len >= buffer.len) return error.NameTooLong;
@memcpy(buffer[0..file_path.len], file_path);
buffer[file_path.len] = 0;
return buffer[0..file_path.len :0];
}
fn lookupDnsSearch(
pool: *Pool,
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
) HostName.LookupError!void {
const pool_io = pool.io();
const rc = HostName.ResolvConf.init(pool_io) catch return error.ResolvConfParseFailed;
// Count dots, suppress search when >=ndots or name ends in
// a dot, which is an explicit request for global scope.
const dots = std.mem.countScalar(u8, host_name.bytes, '.');
const search_len = if (dots >= rc.ndots or std.mem.endsWith(u8, host_name.bytes, ".")) 0 else rc.search_len;
const search = rc.search_buffer[0..search_len];
var canon_name = host_name.bytes;
// Strip final dot for canon, fail if multiple trailing dots.
if (std.mem.endsWith(u8, canon_name, ".")) canon_name.len -= 1;
if (std.mem.endsWith(u8, canon_name, ".")) return error.UnknownHostName;
// Name with search domain appended is set up in `canon_name`. This
// both provides the desired default canonical name (if the requested
// name is not a CNAME record) and serves as a buffer for passing the
// full requested name to `lookupDns`.
@memcpy(options.canonical_name_buffer[0..canon_name.len], canon_name);
options.canonical_name_buffer[canon_name.len] = '.';
var it = std.mem.tokenizeAny(u8, search, " \t");
while (it.next()) |token| {
@memcpy(options.canonical_name_buffer[canon_name.len + 1 ..][0..token.len], token);
const lookup_canon_name = options.canonical_name_buffer[0 .. canon_name.len + 1 + token.len];
if (lookupDns(pool, lookup_canon_name, &rc, resolved, options)) |result| {
return result;
} else |err| switch (err) {
error.UnknownHostName => continue,
else => |e| return e,
}
}
const lookup_canon_name = options.canonical_name_buffer[0..canon_name.len];
return lookupDns(pool, lookup_canon_name, &rc, resolved, options);
}
fn lookupDns(
pool: *Pool,
lookup_canon_name: []const u8,
rc: *const HostName.ResolvConf,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
) HostName.LookupError!void {
const pool_io = pool.io();
const family_records: [2]struct { af: IpAddress.Family, rr: u8 } = .{
.{ .af = .ip6, .rr = std.posix.RR.A },
.{ .af = .ip4, .rr = std.posix.RR.AAAA },
};
var query_buffers: [2][280]u8 = undefined;
var answer_buffer: [2 * 512]u8 = undefined;
var queries_buffer: [2][]const u8 = undefined;
var answers_buffer: [2][]const u8 = undefined;
var nq: usize = 0;
var answer_buffer_i: usize = 0;
for (family_records) |fr| {
if (options.family != fr.af) {
const entropy = std.crypto.random.array(u8, 2);
const len = writeResolutionQuery(&query_buffers[nq], 0, lookup_canon_name, 1, fr.rr, entropy);
queries_buffer[nq] = query_buffers[nq][0..len];
nq += 1;
}
}
var ip4_mapped: [HostName.ResolvConf.max_nameservers]IpAddress = undefined;
var any_ip6 = false;
for (rc.nameservers(), &ip4_mapped) |*ns, *m| {
m.* = .{ .ip6 = .fromAny(ns.*) };
any_ip6 = any_ip6 or ns.* == .ip6;
}
var socket = s: {
if (any_ip6) ip6: {
const ip6_addr: IpAddress = .{ .ip6 = .unspecified(0) };
const socket = ip6_addr.bind(pool_io, .{ .ip6_only = true, .mode = .dgram }) catch |err| switch (err) {
error.AddressFamilyUnsupported => break :ip6,
else => |e| return e,
};
break :s socket;
}
any_ip6 = false;
const ip4_addr: IpAddress = .{ .ip4 = .unspecified(0) };
const socket = try ip4_addr.bind(pool_io, .{ .mode = .dgram });
break :s socket;
};
defer socket.close(pool_io);
const mapped_nameservers = if (any_ip6) ip4_mapped[0..rc.nameservers_len] else rc.nameservers();
const queries = queries_buffer[0..nq];
const answers = answers_buffer[0..queries.len];
var answers_remaining = answers.len;
for (answers) |*answer| answer.len = 0;
// boot clock is chosen because time the computer is suspended should count
// against time spent waiting for external messages to arrive.
const clock: Io.Clock = .boot;
var now_ts = try clock.now(pool_io);
const final_ts = now_ts.addDuration(.fromSeconds(rc.timeout_seconds));
const attempt_duration: Io.Duration = .{
.nanoseconds = std.time.ns_per_s * @as(usize, rc.timeout_seconds) / rc.attempts,
};
send: while (now_ts.nanoseconds < final_ts.nanoseconds) : (now_ts = try clock.now(pool_io)) {
const max_messages = queries_buffer.len * HostName.ResolvConf.max_nameservers;
{
var message_buffer: [max_messages]Io.net.OutgoingMessage = undefined;
var message_i: usize = 0;
for (queries, answers) |query, *answer| {
if (answer.len != 0) continue;
for (mapped_nameservers) |*ns| {
message_buffer[message_i] = .{
.address = ns,
.data_ptr = query.ptr,
.data_len = query.len,
};
message_i += 1;
}
}
_ = netSend(pool, socket.handle, message_buffer[0..message_i], .{});
}
const timeout: Io.Timeout = .{ .deadline = .{
.raw = now_ts.addDuration(attempt_duration),
.clock = clock,
} };
while (true) {
var message_buffer: [max_messages]Io.net.IncomingMessage = undefined;
const buf = answer_buffer[answer_buffer_i..];
const recv_err, const recv_n = socket.receiveManyTimeout(pool_io, &message_buffer, buf, .{}, timeout);
for (message_buffer[0..recv_n]) |*received_message| {
const reply = received_message.data;
// Ignore non-identifiable packets.
if (reply.len < 4) continue;
// Ignore replies from addresses we didn't send to.
const ns = for (mapped_nameservers) |*ns| {
if (received_message.from.eql(ns)) break ns;
} else {
continue;
};
// Find which query this answer goes with, if any.
const query, const answer = for (queries, answers) |query, *answer| {
if (reply[0] == query[0] and reply[1] == query[1]) break .{ query, answer };
} else {
continue;
};
if (answer.len != 0) continue;
// Only accept positive or negative responses; retry immediately on
// server failure, and ignore all other codes such as refusal.
switch (reply[3] & 15) {
0, 3 => {
answer.* = reply;
answer_buffer_i += reply.len;
answers_remaining -= 1;
if (answer_buffer.len - answer_buffer_i == 0) break :send;
if (answers_remaining == 0) break :send;
},
2 => {
var retry_message: Io.net.OutgoingMessage = .{
.address = ns,
.data_ptr = query.ptr,
.data_len = query.len,
};
_ = netSend(pool, socket.handle, (&retry_message)[0..1], .{});
continue;
},
else => continue,
}
}
if (recv_err) |err| switch (err) {
error.Canceled => return error.Canceled,
error.Timeout => continue :send,
else => continue,
};
}
} else {
return error.NameServerFailure;
}
var addresses_len: usize = 0;
var canonical_name: ?HostName = null;
for (answers) |answer| {
var it = HostName.DnsResponse.init(answer) catch {
// TODO accept a diagnostics struct and append warnings
continue;
};
while (it.next() catch {
// TODO accept a diagnostics struct and append warnings
continue;
}) |record| switch (record.rr) {
std.posix.RR.A => {
const data = record.packet[record.data_off..][0..record.data_len];
if (data.len != 4) return error.InvalidDnsARecord;
try resolved.putOne(pool_io, .{ .address = .{ .ip4 = .{
.bytes = data[0..4].*,
.port = options.port,
} } });
addresses_len += 1;
},
std.posix.RR.AAAA => {
const data = record.packet[record.data_off..][0..record.data_len];
if (data.len != 16) return error.InvalidDnsAAAARecord;
try resolved.putOne(pool_io, .{ .address = .{ .ip6 = .{
.bytes = data[0..16].*,
.port = options.port,
} } });
addresses_len += 1;
},
std.posix.RR.CNAME => {
_, canonical_name = HostName.expand(record.packet, record.data_off, options.canonical_name_buffer) catch
return error.InvalidDnsCnameRecord;
},
else => continue,
};
}
try resolved.putOne(pool_io, .{ .canonical_name = canonical_name orelse .{ .bytes = lookup_canon_name } });
if (addresses_len == 0) return error.NameServerFailure;
}
fn lookupHosts(
pool: *Pool,
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
) !void {
const pool_io = pool.io();
const file = Io.File.openAbsolute(pool_io, "/etc/hosts", .{}) catch |err| switch (err) {
error.FileNotFound,
error.NotDir,
error.AccessDenied,
=> return error.UnknownHostName,
error.Canceled => |e| return e,
else => {
// TODO populate optional diagnostic struct
return error.DetectingNetworkConfigurationFailed;
},
};
defer file.close(pool_io);
var line_buf: [512]u8 = undefined;
var file_reader = file.reader(pool_io, &line_buf);
return lookupHostsReader(pool, host_name, resolved, options, &file_reader.interface) catch |err| switch (err) {
error.ReadFailed => switch (file_reader.err.?) {
error.Canceled => |e| return e,
else => {
// TODO populate optional diagnostic struct
return error.DetectingNetworkConfigurationFailed;
},
},
error.Canceled => |e| return e,
error.UnknownHostName => |e| return e,
};
}
fn lookupHostsReader(
pool: *Pool,
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
reader: *Io.Reader,
) error{ ReadFailed, Canceled, UnknownHostName }!void {
const pool_io = pool.io();
var addresses_len: usize = 0;
var canonical_name: ?HostName = null;
while (true) {
const line = reader.takeDelimiterExclusive('\n') catch |err| switch (err) {
error.StreamTooLong => {
// Skip lines that are too long.
_ = reader.discardDelimiterInclusive('\n') catch |e| switch (e) {
error.EndOfStream => break,
error.ReadFailed => return error.ReadFailed,
};
continue;
},
error.ReadFailed => return error.ReadFailed,
error.EndOfStream => break,
};
reader.toss(1);
var split_it = std.mem.splitScalar(u8, line, '#');
const no_comment_line = split_it.first();
var line_it = std.mem.tokenizeAny(u8, no_comment_line, " \t");
const ip_text = line_it.next() orelse continue;
var first_name_text: ?[]const u8 = null;
while (line_it.next()) |name_text| {
if (std.mem.eql(u8, name_text, host_name.bytes)) {
if (first_name_text == null) first_name_text = name_text;
break;
}
} else continue;
if (canonical_name == null) {
if (HostName.init(first_name_text.?)) |name_text| {
if (name_text.bytes.len <= options.canonical_name_buffer.len) {
const canonical_name_dest = options.canonical_name_buffer[0..name_text.bytes.len];
@memcpy(canonical_name_dest, name_text.bytes);
canonical_name = .{ .bytes = canonical_name_dest };
}
} else |_| {}
}
if (options.family != .ip6) {
if (IpAddress.parseIp4(ip_text, options.port)) |addr| {
try resolved.putOne(pool_io, .{ .address = addr });
addresses_len += 1;
} else |_| {}
}
if (options.family != .ip4) {
if (IpAddress.parseIp6(ip_text, options.port)) |addr| {
try resolved.putOne(pool_io, .{ .address = addr });
addresses_len += 1;
} else |_| {}
}
}
if (canonical_name) |canon_name| try resolved.putOne(pool_io, .{ .canonical_name = canon_name });
if (addresses_len == 0) return error.UnknownHostName;
}
/// Writes DNS resolution query packet data to `w`; at most 280 bytes.
fn writeResolutionQuery(q: *[280]u8, op: u4, dname: []const u8, class: u8, ty: u8, entropy: [2]u8) usize {
// This implementation is ported from musl libc.
// A more idiomatic "ziggy" implementation would be welcome.
var name = dname;
if (std.mem.endsWith(u8, name, ".")) name.len -= 1;
assert(name.len <= 253);
const n = 17 + name.len + @intFromBool(name.len != 0);
// Construct query template - ID will be filled later
q[0..2].* = entropy;
@memset(q[2..n], 0);
q[2] = @as(u8, op) * 8 + 1;
q[5] = 1;
@memcpy(q[13..][0..name.len], name);
var i: usize = 13;
var j: usize = undefined;
while (q[i] != 0) : (i = j + 1) {
j = i;
while (q[j] != 0 and q[j] != '.') : (j += 1) {}
// TODO determine the circumstances for this and whether or
// not this should be an error.
if (j - i - 1 > 62) unreachable;
q[i - 1] = @intCast(j - i);
}
q[i + 1] = ty;
q[i + 3] = class;
return n;
}
fn copyCanon(canonical_name_buffer: *[HostName.max_len]u8, name: []const u8) HostName {
const dest = canonical_name_buffer[0..name.len];
@memcpy(dest, name);
return .{ .bytes = dest };
}
pub fn futexWait(ptr: *const std.atomic.Value(u32), expect: u32) void {
@branchHint(.cold);
if (native_os == .linux) {
const linux = std.os.linux;
const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, null);
if (builtin.mode == .Debug) switch (linux.E.init(rc)) {
.SUCCESS => {}, // notified by `wake()`
.INTR => {}, // gives caller a chance to check cancellation
.AGAIN => {}, // ptr.* != expect
.INVAL => {}, // possibly timeout overflow
.TIMEDOUT => unreachable,
.FAULT => unreachable, // ptr was invalid
else => unreachable,
};
return;
}
@compileError("TODO");
}
pub fn futexWaitDuration(ptr: *const std.atomic.Value(u32), expect: u32, timeout: Io.Duration) void {
@branchHint(.cold);
if (native_os == .linux) {
const linux = std.os.linux;
var ts = timestampToPosix(timeout.toNanoseconds());
const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, &ts);
if (builtin.mode == .Debug) switch (linux.E.init(rc)) {
.SUCCESS => {}, // notified by `wake()`
.INTR => {}, // gives caller a chance to check cancellation
.AGAIN => {}, // ptr.* != expect
.TIMEDOUT => {},
.INVAL => {}, // possibly timeout overflow
.FAULT => unreachable, // ptr was invalid
else => unreachable,
};
return;
}
@compileError("TODO");
}
pub fn futexWake(ptr: *const std.atomic.Value(u32), max_waiters: u32) void {
@branchHint(.cold);
if (native_os == .linux) {
const linux = std.os.linux;
const rc = linux.futex_3arg(
&ptr.raw,
.{ .cmd = .WAKE, .private = true },
@min(max_waiters, std.math.maxInt(i32)),
);
if (builtin.mode == .Debug) switch (linux.E.init(rc)) {
.SUCCESS => {}, // successful wake up
.INVAL => {}, // invalid futex_wait() on ptr done elsewhere
.FAULT => {}, // pointer became invalid while doing the wake
else => unreachable,
};
return;
}
@compileError("TODO");
}