zig/lib/std/Io/Threaded.zig
Andrew Kelley 3bf0ce65a5 fix miscellaneous compilation errors
- ILSEQ -> error.BadPathName
- implement dirStatPath for WASI
2025-10-29 06:20:50 -07:00

3770 lines
142 KiB
Zig

const Threaded = @This();
const builtin = @import("builtin");
const native_os = builtin.os.tag;
const is_windows = native_os == .windows;
const windows = std.os.windows;
const std = @import("../std.zig");
const Io = std.Io;
const net = std.Io.net;
const HostName = std.Io.net.HostName;
const IpAddress = std.Io.net.IpAddress;
const Allocator = std.mem.Allocator;
const assert = std.debug.assert;
const posix = std.posix;
/// Thread-safe.
allocator: Allocator,
mutex: std.Thread.Mutex = .{},
cond: std.Thread.Condition = .{},
run_queue: std.SinglyLinkedList = .{},
join_requested: bool = false,
threads: std.ArrayListUnmanaged(std.Thread),
stack_size: usize,
cpu_count: std.Thread.CpuCountError!usize,
concurrent_count: usize,
threadlocal var current_closure: ?*Closure = null;
const max_iovecs_len = 8;
const splat_buffer_size = 64;
comptime {
assert(max_iovecs_len <= posix.IOV_MAX);
}
const Closure = struct {
start: Start,
node: std.SinglyLinkedList.Node = .{},
cancel_tid: std.Thread.Id,
/// Whether this task bumps minimum number of threads in the pool.
is_concurrent: bool,
const Start = *const fn (*Closure) void;
const canceling_tid: std.Thread.Id = switch (@typeInfo(std.Thread.Id)) {
.int => |int_info| switch (int_info.signedness) {
.signed => -1,
.unsigned => std.math.maxInt(std.Thread.Id),
},
.pointer => @ptrFromInt(std.math.maxInt(usize)),
else => @compileError("unsupported std.Thread.Id: " ++ @typeName(std.Thread.Id)),
};
fn requestCancel(closure: *Closure) void {
switch (@atomicRmw(std.Thread.Id, &closure.cancel_tid, .Xchg, canceling_tid, .acq_rel)) {
0, canceling_tid => {},
else => |tid| switch (builtin.os.tag) {
.linux => _ = std.os.linux.tgkill(std.os.linux.getpid(), @bitCast(tid), posix.SIG.IO),
else => {},
},
}
}
};
pub const InitError = std.Thread.CpuCountError || Allocator.Error;
/// Related:
/// * `init_single_threaded`
pub fn init(
/// Must be threadsafe. Only used for the following functions:
/// * `Io.VTable.async`
/// * `Io.VTable.concurrent`
/// * `Io.VTable.groupAsync`
/// If these functions are avoided, then `Allocator.failing` may be passed
/// here.
gpa: Allocator,
) Threaded {
var t: Threaded = .{
.allocator = gpa,
.threads = .empty,
.stack_size = std.Thread.SpawnConfig.default_stack_size,
.cpu_count = std.Thread.getCpuCount(),
.concurrent_count = 0,
};
if (t.cpu_count) |n| {
t.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {};
} else |_| {}
return t;
}
/// Statically initialize such that any call to the following functions will
/// fail with `error.OutOfMemory`:
/// * `Io.VTable.async`
/// * `Io.VTable.concurrent`
/// * `Io.VTable.groupAsync`
/// When initialized this way, `deinit` is safe, but unnecessary to call.
pub const init_single_threaded: Threaded = .{
.allocator = .failing,
.threads = .empty,
.stack_size = std.Thread.SpawnConfig.default_stack_size,
.cpu_count = 1,
.concurrent_count = 0,
};
pub fn deinit(t: *Threaded) void {
const gpa = t.allocator;
t.join();
t.threads.deinit(gpa);
t.* = undefined;
}
fn join(t: *Threaded) void {
if (builtin.single_threaded) return;
{
t.mutex.lock();
defer t.mutex.unlock();
t.join_requested = true;
}
t.cond.broadcast();
for (t.threads.items) |thread| thread.join();
}
fn worker(t: *Threaded) void {
t.mutex.lock();
defer t.mutex.unlock();
while (true) {
while (t.run_queue.popFirst()) |closure_node| {
t.mutex.unlock();
const closure: *Closure = @fieldParentPtr("node", closure_node);
const is_concurrent = closure.is_concurrent;
closure.start(closure);
t.mutex.lock();
if (is_concurrent) {
t.concurrent_count -= 1;
}
}
if (t.join_requested) break;
t.cond.wait(&t.mutex);
}
}
pub fn io(t: *Threaded) Io {
return .{
.userdata = t,
.vtable = &.{
.async = async,
.concurrent = concurrent,
.await = await,
.cancel = cancel,
.cancelRequested = cancelRequested,
.select = select,
.groupAsync = groupAsync,
.groupWait = groupWait,
.groupWaitUncancelable = groupWaitUncancelable,
.groupCancel = groupCancel,
.mutexLock = mutexLock,
.mutexLockUncancelable = mutexLockUncancelable,
.mutexUnlock = mutexUnlock,
.conditionWait = conditionWait,
.conditionWaitUncancelable = conditionWaitUncancelable,
.conditionWake = conditionWake,
.dirMake = switch (builtin.os.tag) {
.windows => @panic("TODO"),
.wasi => @panic("TODO"),
else => dirMakePosix,
},
.dirStat = dirStat,
.dirStatPath = switch (builtin.os.tag) {
.linux => dirStatPathLinux,
.windows => @panic("TODO"),
.wasi => dirStatPathWasi,
else => dirStatPathPosix,
},
.fileStat = switch (builtin.os.tag) {
.linux => fileStatLinux,
.windows => fileStatWindows,
.wasi => fileStatWasi,
else => fileStatPosix,
},
.dirCreateFile = switch (builtin.os.tag) {
.windows => @panic("TODO"),
.wasi => @panic("TODO"),
else => dirCreateFilePosix,
},
.dirOpenFile = dirOpenFile,
.fileClose = fileClose,
.fileWriteStreaming = fileWriteStreaming,
.fileWritePositional = fileWritePositional,
.fileReadStreaming = fileReadStreaming,
.fileReadPositional = fileReadPositional,
.fileSeekBy = fileSeekBy,
.fileSeekTo = fileSeekTo,
.now = switch (builtin.os.tag) {
.windows => nowWindows,
.wasi => nowWasi,
else => nowPosix,
},
.sleep = switch (builtin.os.tag) {
.windows => sleepWindows,
.wasi => sleepWasi,
.linux => sleepLinux,
else => sleepPosix,
},
.netListenIp = switch (builtin.os.tag) {
.windows => @panic("TODO"),
else => netListenIpPosix,
},
.netListenUnix = netListenUnix,
.netAccept = switch (builtin.os.tag) {
.windows => @panic("TODO"),
else => netAcceptPosix,
},
.netBindIp = switch (builtin.os.tag) {
.windows => @panic("TODO"),
else => netBindIpPosix,
},
.netConnectIp = switch (builtin.os.tag) {
.windows => @panic("TODO"),
else => netConnectIpPosix,
},
.netConnectUnix = netConnectUnix,
.netClose = netClose,
.netRead = switch (builtin.os.tag) {
.windows => @panic("TODO"),
else => netReadPosix,
},
.netWrite = switch (builtin.os.tag) {
.windows => @panic("TODO"),
else => netWritePosix,
},
.netSend = netSend,
.netReceive = netReceive,
.netInterfaceNameResolve = netInterfaceNameResolve,
.netInterfaceName = netInterfaceName,
.netLookup = netLookup,
},
};
}
/// Trailing data:
/// 1. context
/// 2. result
const AsyncClosure = struct {
closure: Closure,
func: *const fn (context: *anyopaque, result: *anyopaque) void,
reset_event: ResetEvent,
select_condition: ?*ResetEvent,
context_alignment: std.mem.Alignment,
result_offset: usize,
/// Whether the task has a return type with nonzero bits.
has_result: bool,
const done_reset_event: *ResetEvent = @ptrFromInt(@alignOf(ResetEvent));
fn start(closure: *Closure) void {
const ac: *AsyncClosure = @alignCast(@fieldParentPtr("closure", closure));
const tid = std.Thread.getCurrentId();
if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, 0, tid, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == Closure.canceling_tid);
// Even though we already know the task is canceled, we must still
// run the closure in order to make the return value valid - that
// is, unless the result is zero bytes!
if (!ac.has_result) {
ac.reset_event.set();
return;
}
}
current_closure = closure;
ac.func(ac.contextPointer(), ac.resultPointer());
current_closure = null;
// In case a cancel happens after successful task completion, prevents
// signal from being delivered to the thread in `requestCancel`.
if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, tid, 0, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == Closure.canceling_tid);
}
if (@atomicRmw(?*ResetEvent, &ac.select_condition, .Xchg, done_reset_event, .release)) |select_reset| {
assert(select_reset != done_reset_event);
select_reset.set();
}
ac.reset_event.set();
}
fn resultPointer(ac: *AsyncClosure) [*]u8 {
const base: [*]u8 = @ptrCast(ac);
return base + ac.result_offset;
}
fn contextPointer(ac: *AsyncClosure) [*]u8 {
const base: [*]u8 = @ptrCast(ac);
return base + ac.context_alignment.forward(@sizeOf(AsyncClosure));
}
fn waitAndFree(ac: *AsyncClosure, gpa: Allocator, result: []u8) void {
ac.reset_event.waitUncancelable();
@memcpy(result, ac.resultPointer()[0..result.len]);
free(ac, gpa, result.len);
}
fn free(ac: *AsyncClosure, gpa: Allocator, result_len: usize) void {
if (!ac.has_result) assert(result_len == 0);
const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(ac);
gpa.free(base[0 .. ac.result_offset + result_len]);
}
};
fn async(
userdata: ?*anyopaque,
result: []u8,
result_alignment: std.mem.Alignment,
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*Io.AnyFuture {
if (builtin.single_threaded) {
start(context.ptr, result.ptr);
return null;
}
const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.cpu_count catch {
return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
start(context.ptr, result.ptr);
return null;
};
};
const gpa = t.allocator;
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len);
const n = result_offset + result.len;
const ac: *AsyncClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch {
start(context.ptr, result.ptr);
return null;
}));
ac.* = .{
.closure = .{
.cancel_tid = 0,
.start = AsyncClosure.start,
.is_concurrent = false,
},
.func = start,
.context_alignment = context_alignment,
.result_offset = result_offset,
.has_result = result.len != 0,
.reset_event = .unset,
.select_condition = null,
};
@memcpy(ac.contextPointer()[0..context.len], context);
t.mutex.lock();
const thread_capacity = cpu_count - 1 + t.concurrent_count;
t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
t.mutex.unlock();
ac.free(gpa, result.len);
start(context.ptr, result.ptr);
return null;
};
t.run_queue.prepend(&ac.closure.node);
if (t.threads.items.len < thread_capacity) {
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
if (t.threads.items.len == 0) {
assert(t.run_queue.popFirst() == &ac.closure.node);
t.mutex.unlock();
ac.free(gpa, result.len);
start(context.ptr, result.ptr);
return null;
}
// Rely on other workers to do it.
t.mutex.unlock();
t.cond.signal();
return @ptrCast(ac);
};
t.threads.appendAssumeCapacity(thread);
}
t.mutex.unlock();
t.cond.signal();
return @ptrCast(ac);
}
fn concurrent(
userdata: ?*anyopaque,
result_len: usize,
result_alignment: std.mem.Alignment,
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) error{OutOfMemory}!*Io.AnyFuture {
if (builtin.single_threaded) unreachable;
const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.cpu_count catch 1;
const gpa = t.allocator;
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len);
const n = result_offset + result_len;
const ac: *AsyncClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(AsyncClosure), n)));
ac.* = .{
.closure = .{
.cancel_tid = 0,
.start = AsyncClosure.start,
.is_concurrent = true,
},
.func = start,
.context_alignment = context_alignment,
.result_offset = result_offset,
.has_result = result_len != 0,
.reset_event = .unset,
.select_condition = null,
};
@memcpy(ac.contextPointer()[0..context.len], context);
t.mutex.lock();
t.concurrent_count += 1;
const thread_capacity = cpu_count - 1 + t.concurrent_count;
t.threads.ensureTotalCapacity(gpa, thread_capacity) catch {
t.mutex.unlock();
ac.free(gpa, result_len);
return error.OutOfMemory;
};
t.run_queue.prepend(&ac.closure.node);
if (t.threads.items.len < thread_capacity) {
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
assert(t.run_queue.popFirst() == &ac.closure.node);
t.mutex.unlock();
ac.free(gpa, result_len);
return error.OutOfMemory;
};
t.threads.appendAssumeCapacity(thread);
}
t.mutex.unlock();
t.cond.signal();
return @ptrCast(ac);
}
const GroupClosure = struct {
closure: Closure,
t: *Threaded,
group: *Io.Group,
/// Points to sibling `GroupClosure`. Used for walking the group to cancel all.
node: std.SinglyLinkedList.Node,
func: *const fn (*Io.Group, context: *anyopaque) void,
context_alignment: std.mem.Alignment,
context_len: usize,
fn start(closure: *Closure) void {
const gc: *GroupClosure = @alignCast(@fieldParentPtr("closure", closure));
const tid = std.Thread.getCurrentId();
const group = gc.group;
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
const reset_event: *ResetEvent = @ptrCast(&group.context);
if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, 0, tid, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == Closure.canceling_tid);
// We already know the task is canceled before running the callback. Since all closures
// in a Group have void return type, we can return early.
syncFinish(group_state, reset_event);
return;
}
current_closure = closure;
gc.func(group, gc.contextPointer());
current_closure = null;
// In case a cancel happens after successful task completion, prevents
// signal from being delivered to the thread in `requestCancel`.
if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, tid, 0, .acq_rel, .acquire)) |cancel_tid| {
assert(cancel_tid == Closure.canceling_tid);
}
syncFinish(group_state, reset_event);
}
fn free(gc: *GroupClosure, gpa: Allocator) void {
const base: [*]align(@alignOf(GroupClosure)) u8 = @ptrCast(gc);
gpa.free(base[0..contextEnd(gc.context_alignment, gc.context_len)]);
}
fn contextOffset(context_alignment: std.mem.Alignment) usize {
return context_alignment.forward(@sizeOf(GroupClosure));
}
fn contextEnd(context_alignment: std.mem.Alignment, context_len: usize) usize {
return contextOffset(context_alignment) + context_len;
}
fn contextPointer(gc: *GroupClosure) [*]u8 {
const base: [*]u8 = @ptrCast(gc);
return base + contextOffset(gc.context_alignment);
}
const sync_is_waiting: usize = 1 << 0;
const sync_one_pending: usize = 1 << 1;
fn syncStart(state: *std.atomic.Value(usize)) void {
const prev_state = state.fetchAdd(sync_one_pending, .monotonic);
assert((prev_state / sync_one_pending) < (std.math.maxInt(usize) / sync_one_pending));
}
fn syncFinish(state: *std.atomic.Value(usize), event: *ResetEvent) void {
const prev_state = state.fetchSub(sync_one_pending, .acq_rel);
assert((prev_state / sync_one_pending) > 0);
if (prev_state == (sync_one_pending | sync_is_waiting)) event.set();
}
fn syncWait(t: *Threaded, state: *std.atomic.Value(usize), event: *ResetEvent) Io.Cancelable!void {
const prev_state = state.fetchAdd(sync_is_waiting, .acquire);
assert(prev_state & sync_is_waiting == 0);
if ((prev_state / sync_one_pending) > 0) try event.wait(t);
}
fn syncWaitUncancelable(state: *std.atomic.Value(usize), event: *ResetEvent) void {
const prev_state = state.fetchAdd(sync_is_waiting, .acquire);
assert(prev_state & sync_is_waiting == 0);
if ((prev_state / sync_one_pending) > 0) event.waitUncancelable();
}
};
fn groupAsync(
userdata: ?*anyopaque,
group: *Io.Group,
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (*Io.Group, context: *const anyopaque) void,
) void {
if (builtin.single_threaded) return start(group, context.ptr);
const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.cpu_count catch 1;
const gpa = t.allocator;
const n = GroupClosure.contextEnd(context_alignment, context.len);
const gc: *GroupClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(GroupClosure), n) catch {
return start(group, context.ptr);
}));
gc.* = .{
.closure = .{
.cancel_tid = 0,
.start = GroupClosure.start,
.is_concurrent = false,
},
.t = t,
.group = group,
.node = undefined,
.func = start,
.context_alignment = context_alignment,
.context_len = context.len,
};
@memcpy(gc.contextPointer()[0..context.len], context);
t.mutex.lock();
// Append to the group linked list inside the mutex to make `Io.Group.async` thread-safe.
gc.node = .{ .next = @ptrCast(@alignCast(group.token)) };
group.token = &gc.node;
const thread_capacity = cpu_count - 1 + t.concurrent_count;
t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
t.mutex.unlock();
gc.free(gpa);
return start(group, context.ptr);
};
t.run_queue.prepend(&gc.closure.node);
if (t.threads.items.len < thread_capacity) {
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
assert(t.run_queue.popFirst() == &gc.closure.node);
t.mutex.unlock();
gc.free(gpa);
return start(group, context.ptr);
};
t.threads.appendAssumeCapacity(thread);
}
// This needs to be done before unlocking the mutex to avoid a race with
// the associated task finishing.
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
GroupClosure.syncStart(group_state);
t.mutex.unlock();
t.cond.signal();
}
fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) Io.Cancelable!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const gpa = t.allocator;
if (builtin.single_threaded) return;
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
const reset_event: *ResetEvent = @ptrCast(&group.context);
try GroupClosure.syncWait(t, group_state, reset_event);
var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
while (true) {
const gc: *GroupClosure = @fieldParentPtr("node", node);
const node_next = node.next;
gc.free(gpa);
node = node_next orelse break;
}
}
fn groupWaitUncancelable(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const gpa = t.allocator;
if (builtin.single_threaded) return;
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
const reset_event: *ResetEvent = @ptrCast(&group.context);
GroupClosure.syncWaitUncancelable(group_state, reset_event);
var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
while (true) {
const gc: *GroupClosure = @fieldParentPtr("node", node);
const node_next = node.next;
gc.free(gpa);
node = node_next orelse break;
}
}
fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const gpa = t.allocator;
if (builtin.single_threaded) return;
{
var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
while (true) {
const gc: *GroupClosure = @fieldParentPtr("node", node);
gc.closure.requestCancel();
node = node.next orelse break;
}
}
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
const reset_event: *ResetEvent = @ptrCast(&group.context);
GroupClosure.syncWaitUncancelable(group_state, reset_event);
{
var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
while (true) {
const gc: *GroupClosure = @fieldParentPtr("node", node);
const node_next = node.next;
gc.free(gpa);
node = node_next orelse break;
}
}
}
fn await(
userdata: ?*anyopaque,
any_future: *Io.AnyFuture,
result: []u8,
result_alignment: std.mem.Alignment,
) void {
_ = result_alignment;
const t: *Threaded = @ptrCast(@alignCast(userdata));
const closure: *AsyncClosure = @ptrCast(@alignCast(any_future));
closure.waitAndFree(t.allocator, result);
}
fn cancel(
userdata: ?*anyopaque,
any_future: *Io.AnyFuture,
result: []u8,
result_alignment: std.mem.Alignment,
) void {
_ = result_alignment;
const t: *Threaded = @ptrCast(@alignCast(userdata));
const ac: *AsyncClosure = @ptrCast(@alignCast(any_future));
ac.closure.requestCancel();
ac.waitAndFree(t.allocator, result);
}
fn cancelRequested(userdata: ?*anyopaque) bool {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
const closure = current_closure orelse return false;
return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == Closure.canceling_tid;
}
fn checkCancel(t: *Threaded) error{Canceled}!void {
if (cancelRequested(t)) return error.Canceled;
}
fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (prev_state == .contended) {
try futexWait(t, @ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) {
try futexWait(t, @ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
}
fn mutexLockUncancelable(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
_ = userdata;
if (prev_state == .contended) {
futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) {
futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
}
fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
_ = userdata;
_ = prev_state;
if (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .unlocked, .release) == .contended) {
futexWake(@ptrCast(&mutex.state), 1);
}
}
fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const t_io = t.io();
comptime assert(@TypeOf(cond.state) == u64);
const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
const cond_state = &ints[0];
const cond_epoch = &ints[1];
const one_waiter = 1;
const waiter_mask = 0xffff;
const one_signal = 1 << 16;
const signal_mask = 0xffff << 16;
var epoch = cond_epoch.load(.acquire);
var state = cond_state.fetchAdd(one_waiter, .monotonic);
assert(state & waiter_mask != waiter_mask);
state += one_waiter;
mutex.unlock(t_io);
defer mutex.lockUncancelable(t_io);
while (true) {
futexWaitUncancelable(cond_epoch, epoch);
epoch = cond_epoch.load(.acquire);
state = cond_state.load(.monotonic);
while (state & signal_mask != 0) {
const new_state = state - one_waiter - one_signal;
state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return;
}
}
}
fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
comptime assert(@TypeOf(cond.state) == u64);
const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
const cond_state = &ints[0];
const cond_epoch = &ints[1];
const one_waiter = 1;
const waiter_mask = 0xffff;
const one_signal = 1 << 16;
const signal_mask = 0xffff << 16;
// Observe the epoch, then check the state again to see if we should wake up.
// The epoch must be observed before we check the state or we could potentially miss a wake() and deadlock:
//
// - T1: s = LOAD(&state)
// - T2: UPDATE(&s, signal)
// - T2: UPDATE(&epoch, 1) + FUTEX_WAKE(&epoch)
// - T1: e = LOAD(&epoch) (was reordered after the state load)
// - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed the state update + the epoch change)
//
// Acquire barrier to ensure the epoch load happens before the state load.
var epoch = cond_epoch.load(.acquire);
var state = cond_state.fetchAdd(one_waiter, .monotonic);
assert(state & waiter_mask != waiter_mask);
state += one_waiter;
mutex.unlock(t.io());
defer mutex.lockUncancelable(t.io());
while (true) {
try futexWait(t, cond_epoch, epoch);
epoch = cond_epoch.load(.acquire);
state = cond_state.load(.monotonic);
// Try to wake up by consuming a signal and decremented the waiter we
// added previously. Acquire barrier ensures code before the wake()
// which added the signal happens before we decrement it and return.
while (state & signal_mask != 0) {
const new_state = state - one_waiter - one_signal;
state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return;
}
}
}
fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
comptime assert(@TypeOf(cond.state) == u64);
const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
const cond_state = &ints[0];
const cond_epoch = &ints[1];
const one_waiter = 1;
const waiter_mask = 0xffff;
const one_signal = 1 << 16;
const signal_mask = 0xffff << 16;
var state = cond_state.load(.monotonic);
while (true) {
const waiters = (state & waiter_mask) / one_waiter;
const signals = (state & signal_mask) / one_signal;
// Reserves which waiters to wake up by incrementing the signals count.
// Therefore, the signals count is always less than or equal to the
// waiters count. We don't need to Futex.wake if there's nothing to
// wake up or if other wake() threads have reserved to wake up the
// current waiters.
const wakeable = waiters - signals;
if (wakeable == 0) {
return;
}
const to_wake = switch (wake) {
.one => 1,
.all => wakeable,
};
// Reserve the amount of waiters to wake by incrementing the signals
// count. Release barrier ensures code before the wake() happens before
// the signal it posted and consumed by the wait() threads.
const new_state = state + (one_signal * to_wake);
state = cond_state.cmpxchgWeak(state, new_state, .release, .monotonic) orelse {
// Wake up the waiting threads we reserved above by changing the epoch value.
//
// A waiting thread could miss a wake up if *exactly* ((1<<32)-1)
// wake()s happen between it observing the epoch and sleeping on
// it. This is very unlikely due to how many precise amount of
// Futex.wake() calls that would be between the waiting thread's
// potential preemption.
//
// Release barrier ensures the signal being added to the state
// happens before the epoch is changed. If not, the waiting thread
// could potentially deadlock from missing both the state and epoch
// change:
//
// - T2: UPDATE(&epoch, 1) (reordered before the state change)
// - T1: e = LOAD(&epoch)
// - T1: s = LOAD(&state)
// - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch)
// - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change)
_ = cond_epoch.fetchAdd(1, .release);
futexWake(cond_epoch, to_wake);
return;
};
}
}
fn dirMakePosix(userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, mode: Io.Dir.Mode) Io.Dir.MakeError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
var path_buffer: [posix.PATH_MAX]u8 = undefined;
const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.mkdirat(dir.handle, sub_path_posix, mode))) {
.SUCCESS => return,
.INTR => continue,
.ACCES => return error.AccessDenied,
.BADF => |err| return errnoBug(err),
.PERM => return error.PermissionDenied,
.DQUOT => return error.DiskQuota,
.EXIST => return error.PathAlreadyExists,
.FAULT => |err| return errnoBug(err),
.LOOP => return error.SymLinkLoop,
.MLINK => return error.LinkQuotaExceeded,
.NAMETOOLONG => return error.NameTooLong,
.NOENT => return error.FileNotFound,
.NOMEM => return error.SystemResources,
.NOSPC => return error.NoSpaceLeft,
.NOTDIR => return error.NotDir,
.ROFS => return error.ReadOnlyFileSystem,
// dragonfly: when dir_fd is unlinked from filesystem
.NOTCONN => return error.FileNotFound,
.ILSEQ => return error.BadPathName,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn dirStat(userdata: ?*anyopaque, dir: Io.Dir) Io.Dir.StatError!Io.Dir.Stat {
const t: *Threaded = @ptrCast(@alignCast(userdata));
try t.checkCancel();
_ = dir;
@panic("TODO");
}
fn dirStatPathLinux(
userdata: ?*anyopaque,
dir: Io.Dir,
sub_path: []const u8,
options: Io.Dir.StatPathOptions,
) Io.Dir.StatPathError!Io.File.Stat {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const linux = std.os.linux;
var path_buffer: [posix.PATH_MAX]u8 = undefined;
const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
const flags: u32 = linux.AT.NO_AUTOMOUNT |
@as(u32, if (!options.follow_symlinks) linux.AT.SYMLINK_NOFOLLOW else 0);
while (true) {
try t.checkCancel();
var statx = std.mem.zeroes(linux.Statx);
const rc = linux.statx(
dir.handle,
sub_path_posix,
flags,
linux.STATX_TYPE | linux.STATX_MODE | linux.STATX_ATIME | linux.STATX_MTIME | linux.STATX_CTIME,
&statx,
);
switch (linux.E.init(rc)) {
.SUCCESS => return statFromLinux(&statx),
.INTR => continue,
.ACCES => return error.AccessDenied,
.BADF => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err),
.LOOP => return error.SymLinkLoop,
.NAMETOOLONG => |err| return errnoBug(err), // Handled by pathToPosix() above.
.NOENT => return error.FileNotFound,
.NOTDIR => return error.NotDir,
.NOMEM => return error.SystemResources,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn dirStatPathPosix(
userdata: ?*anyopaque,
dir: Io.Dir,
sub_path: []const u8,
options: Io.Dir.StatPathOptions,
) Io.Dir.StatPathError!Io.File.Stat {
const t: *Threaded = @ptrCast(@alignCast(userdata));
var path_buffer: [posix.PATH_MAX]u8 = undefined;
const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
const flags: u32 = if (!options.follow_symlinks) posix.AT.SYMLINK_NOFOLLOW else 0;
while (true) {
try t.checkCancel();
var stat = std.mem.zeroes(posix.Stat);
switch (posix.errno(fstatat_sym(dir.handle, sub_path_posix, &stat, flags))) {
.SUCCESS => return statFromPosix(&stat),
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err), // Always a race condition.
.NOMEM => return error.SystemResources,
.ACCES => return error.AccessDenied,
.PERM => return error.PermissionDenied,
.FAULT => |err| return errnoBug(err),
.NAMETOOLONG => return error.NameTooLong,
.LOOP => return error.SymLinkLoop,
.NOENT => return error.FileNotFound,
.NOTDIR => return error.FileNotFound,
.ILSEQ => return error.BadPathName,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn dirStatPathWasi(
userdata: ?*anyopaque,
dir: Io.Dir,
sub_path: []const u8,
options: Io.Dir.StatPathOptions,
) Io.Dir.StatPathError!Io.File.Stat {
if (builtin.link_libc) return dirStatPathPosix(userdata, dir, sub_path, options);
const t: *Threaded = @ptrCast(@alignCast(userdata));
const dir_fd = dir.handle;
const wasi = std.os.wasi;
const flags: wasi.lookupflags_t = .{
.SYMLINK_FOLLOW = @intFromBool(options.follow_symlinks),
};
var stat: wasi.filestat_t = undefined;
while (true) {
try t.checkCancel();
switch (wasi.path_filestat_get(dir_fd, flags, sub_path.ptr, sub_path.len, &stat)) {
.SUCCESS => return statFromWasi(stat),
.INTR => continue,
.CANCELED => return error.Canceled,
.INVAL => |err| errnoBug(err),
.BADF => |err| errnoBug(err), // Always a race condition.
.NOMEM => return error.SystemResources,
.ACCES => return error.AccessDenied,
.FAULT => |err| errnoBug(err),
.NAMETOOLONG => return error.NameTooLong,
.NOENT => return error.FileNotFound,
.NOTDIR => return error.FileNotFound,
.NOTCAPABLE => return error.AccessDenied,
.ILSEQ => return error.BadPathName,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn fileStatPosix(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat {
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (posix.Stat == void) return error.Streaming;
while (true) {
try t.checkCancel();
var stat = std.mem.zeroes(posix.Stat);
switch (posix.errno(fstat_sym(file.handle, &stat))) {
.SUCCESS => return statFromPosix(&stat),
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err),
.NOMEM => return error.SystemResources,
.ACCES => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn fileStatLinux(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const linux = std.os.linux;
while (true) {
try t.checkCancel();
var statx = std.mem.zeroes(linux.Statx);
const rc = linux.statx(
file.handle,
"",
linux.AT.EMPTY_PATH,
linux.STATX_TYPE | linux.STATX_MODE | linux.STATX_ATIME | linux.STATX_MTIME | linux.STATX_CTIME,
&statx,
);
switch (linux.E.init(rc)) {
.SUCCESS => return statFromLinux(&statx),
.INTR => continue,
.ACCES => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err),
.LOOP => |err| return errnoBug(err),
.NAMETOOLONG => |err| return errnoBug(err),
.NOENT => |err| return errnoBug(err),
.NOMEM => return error.SystemResources,
.NOTDIR => |err| return errnoBug(err),
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn fileStatWindows(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat {
const t: *Threaded = @ptrCast(@alignCast(userdata));
try t.checkCancel();
_ = file;
@panic("TODO");
}
fn fileStatWasi(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat {
if (builtin.link_libc) return fileStatPosix(userdata, file);
const t: *Threaded = @ptrCast(@alignCast(userdata));
while (true) {
try t.checkCancel();
var stat: std.os.wasi.filestat_t = undefined;
switch (std.os.wasi.fd_filestat_get(file.handle, &stat)) {
.SUCCESS => return statFromWasi(&stat),
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err),
.NOMEM => return error.SystemResources,
.ACCES => return error.AccessDenied,
.NOTCAPABLE => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
}
}
const have_flock = @TypeOf(posix.system.flock) != void;
const openat_sym = if (posix.lfs64_abi) posix.system.openat64 else posix.system.openat;
const fstat_sym = if (posix.lfs64_abi) posix.system.fstat64 else posix.system.fstat;
const fstatat_sym = if (posix.lfs64_abi) posix.system.fstatat64 else posix.system.fstatat;
const lseek_sym = if (posix.lfs64_abi) posix.system.lseek64 else posix.system.lseek;
const preadv_sym = if (posix.lfs64_abi) posix.system.preadv64 else posix.system.preadv;
fn dirCreateFilePosix(
userdata: ?*anyopaque,
dir: Io.Dir,
sub_path: []const u8,
flags: Io.File.CreateFlags,
) Io.File.OpenError!Io.File {
const t: *Threaded = @ptrCast(@alignCast(userdata));
var path_buffer: [posix.PATH_MAX]u8 = undefined;
const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
var os_flags: posix.O = .{
.ACCMODE = if (flags.read) .RDWR else .WRONLY,
.CREAT = true,
.TRUNC = flags.truncate,
.EXCL = flags.exclusive,
};
if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true;
if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true;
// Use the O locking flags if the os supports them to acquire the lock
// atomically. Note that the NONBLOCK flag is removed after the openat()
// call is successful.
const has_flock_open_flags = @hasField(posix.O, "EXLOCK");
if (has_flock_open_flags) switch (flags.lock) {
.none => {},
.shared => {
os_flags.SHLOCK = true;
os_flags.NONBLOCK = flags.lock_nonblocking;
},
.exclusive => {
os_flags.EXLOCK = true;
os_flags.NONBLOCK = flags.lock_nonblocking;
},
};
const fd: posix.fd_t = while (true) {
try t.checkCancel();
const rc = openat_sym(dir.handle, sub_path_posix, os_flags, flags.mode);
switch (posix.errno(rc)) {
.SUCCESS => break @intCast(rc),
.INTR => continue,
.FAULT => |err| return errnoBug(err),
.INVAL => return error.BadPathName,
.BADF => |err| return errnoBug(err),
.ACCES => return error.AccessDenied,
.FBIG => return error.FileTooBig,
.OVERFLOW => return error.FileTooBig,
.ISDIR => return error.IsDir,
.LOOP => return error.SymLinkLoop,
.MFILE => return error.ProcessFdQuotaExceeded,
.NAMETOOLONG => return error.NameTooLong,
.NFILE => return error.SystemFdQuotaExceeded,
.NODEV => return error.NoDevice,
.NOENT => return error.FileNotFound,
.SRCH => return error.ProcessNotFound,
.NOMEM => return error.SystemResources,
.NOSPC => return error.NoSpaceLeft,
.NOTDIR => return error.NotDir,
.PERM => return error.PermissionDenied,
.EXIST => return error.PathAlreadyExists,
.BUSY => return error.DeviceBusy,
.OPNOTSUPP => return error.FileLocksNotSupported,
.AGAIN => return error.WouldBlock,
.TXTBSY => return error.FileBusy,
.NXIO => return error.NoDevice,
.ILSEQ => return error.BadPathName,
else => |err| return posix.unexpectedErrno(err),
}
};
errdefer posix.close(fd);
if (have_flock and !has_flock_open_flags and flags.lock != .none) {
const lock_nonblocking: i32 = if (flags.lock_nonblocking) posix.LOCK.NB else 0;
const lock_flags = switch (flags.lock) {
.none => unreachable,
.shared => posix.LOCK.SH | lock_nonblocking,
.exclusive => posix.LOCK.EX | lock_nonblocking,
};
while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.flock(fd, lock_flags))) {
.SUCCESS => break,
.INTR => continue,
.BADF => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err), // invalid parameters
.NOLCK => return error.SystemResources,
.AGAIN => return error.WouldBlock,
.OPNOTSUPP => return error.FileLocksNotSupported,
else => |err| return posix.unexpectedErrno(err),
}
}
}
if (has_flock_open_flags and flags.lock_nonblocking) {
var fl_flags: usize = while (true) {
try t.checkCancel();
const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0));
switch (posix.errno(rc)) {
.SUCCESS => break @intCast(rc),
.INTR => continue,
else => |err| return posix.unexpectedErrno(err),
}
};
fl_flags &= ~@as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK"));
while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFL, fl_flags))) {
.SUCCESS => break,
.INTR => continue,
else => |err| return posix.unexpectedErrno(err),
}
}
}
return .{ .handle = fd };
}
fn dirOpenFile(
userdata: ?*anyopaque,
dir: Io.Dir,
sub_path: []const u8,
flags: Io.File.OpenFlags,
) Io.File.OpenError!Io.File {
const t: *Threaded = @ptrCast(@alignCast(userdata));
var path_buffer: [posix.PATH_MAX]u8 = undefined;
const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
var os_flags: posix.O = switch (native_os) {
.wasi => .{
.read = flags.mode != .write_only,
.write = flags.mode != .read_only,
},
else => .{
.ACCMODE = switch (flags.mode) {
.read_only => .RDONLY,
.write_only => .WRONLY,
.read_write => .RDWR,
},
},
};
if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true;
if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true;
if (@hasField(posix.O, "NOCTTY")) os_flags.NOCTTY = !flags.allow_ctty;
// Use the O locking flags if the os supports them to acquire the lock
// atomically.
const has_flock_open_flags = @hasField(posix.O, "EXLOCK");
if (has_flock_open_flags) {
// Note that the NONBLOCK flag is removed after the openat() call
// is successful.
switch (flags.lock) {
.none => {},
.shared => {
os_flags.SHLOCK = true;
os_flags.NONBLOCK = flags.lock_nonblocking;
},
.exclusive => {
os_flags.EXLOCK = true;
os_flags.NONBLOCK = flags.lock_nonblocking;
},
}
}
const fd: posix.fd_t = while (true) {
try t.checkCancel();
const rc = openat_sym(dir.handle, sub_path_posix, os_flags, @as(posix.mode_t, 0));
switch (posix.errno(rc)) {
.SUCCESS => break @intCast(rc),
.INTR => continue,
.FAULT => |err| return errnoBug(err),
.INVAL => return error.BadPathName,
.BADF => |err| return errnoBug(err),
.ACCES => return error.AccessDenied,
.FBIG => return error.FileTooBig,
.OVERFLOW => return error.FileTooBig,
.ISDIR => return error.IsDir,
.LOOP => return error.SymLinkLoop,
.MFILE => return error.ProcessFdQuotaExceeded,
.NAMETOOLONG => return error.NameTooLong,
.NFILE => return error.SystemFdQuotaExceeded,
.NODEV => return error.NoDevice,
.NOENT => return error.FileNotFound,
.SRCH => return error.ProcessNotFound,
.NOMEM => return error.SystemResources,
.NOSPC => return error.NoSpaceLeft,
.NOTDIR => return error.NotDir,
.PERM => return error.PermissionDenied,
.EXIST => return error.PathAlreadyExists,
.BUSY => return error.DeviceBusy,
.OPNOTSUPP => return error.FileLocksNotSupported,
.AGAIN => return error.WouldBlock,
.TXTBSY => return error.FileBusy,
.NXIO => return error.NoDevice,
.ILSEQ => return error.BadPathName,
else => |err| return posix.unexpectedErrno(err),
}
};
errdefer posix.close(fd);
if (have_flock and !has_flock_open_flags and flags.lock != .none) {
const lock_nonblocking: i32 = if (flags.lock_nonblocking) posix.LOCK.NB else 0;
const lock_flags = switch (flags.lock) {
.none => unreachable,
.shared => posix.LOCK.SH | lock_nonblocking,
.exclusive => posix.LOCK.EX | lock_nonblocking,
};
while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.flock(fd, lock_flags))) {
.SUCCESS => break,
.INTR => continue,
.BADF => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err), // invalid parameters
.NOLCK => return error.SystemResources,
.AGAIN => return error.WouldBlock,
.OPNOTSUPP => return error.FileLocksNotSupported,
else => |err| return posix.unexpectedErrno(err),
}
}
}
if (has_flock_open_flags and flags.lock_nonblocking) {
var fl_flags: usize = while (true) {
try t.checkCancel();
const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0));
switch (posix.errno(rc)) {
.SUCCESS => break @intCast(rc),
.INTR => continue,
else => |err| return posix.unexpectedErrno(err),
}
};
fl_flags &= ~@as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK"));
while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFL, fl_flags))) {
.SUCCESS => break,
.INTR => continue,
else => |err| return posix.unexpectedErrno(err),
}
}
}
return .{ .handle = fd };
}
fn fileClose(userdata: ?*anyopaque, file: Io.File) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
posix.close(file.handle);
}
fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File.ReadStreamingError!usize {
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (is_windows) {
const DWORD = windows.DWORD;
var index: usize = 0;
var truncate: usize = 0;
var total: usize = 0;
while (index < data.len) {
try t.checkCancel();
{
const untruncated = data[index];
data[index] = untruncated[truncate..];
defer data[index] = untruncated;
const buffer = data[index..];
const want_read_count: DWORD = @min(std.math.maxInt(DWORD), buffer.len);
var n: DWORD = undefined;
if (windows.kernel32.ReadFile(file.handle, buffer.ptr, want_read_count, &n, null) == 0) {
switch (windows.GetLastError()) {
.IO_PENDING => unreachable,
.OPERATION_ABORTED => continue,
.BROKEN_PIPE => return 0,
.HANDLE_EOF => return 0,
.NETNAME_DELETED => return error.ConnectionResetByPeer,
.LOCK_VIOLATION => return error.LockViolation,
.ACCESS_DENIED => return error.AccessDenied,
.INVALID_HANDLE => return error.NotOpenForReading,
else => |err| return windows.unexpectedError(err),
}
}
total += n;
truncate += n;
}
while (index < data.len and truncate >= data[index].len) {
truncate -= data[index].len;
index += 1;
}
}
return total;
}
var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
var i: usize = 0;
for (data) |buf| {
if (iovecs_buffer.len - i == 0) break;
if (buf.len != 0) {
iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len };
i += 1;
}
}
const dest = iovecs_buffer[0..i];
assert(dest[0].len > 0);
if (native_os == .wasi and !builtin.link_libc) while (true) {
try t.checkCancel();
var nread: usize = undefined;
switch (std.os.wasi.fd_read(file.handle, dest.ptr, dest.len, &nread)) {
.SUCCESS => return nread,
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err),
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.Timeout,
.NOTCAPABLE => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
};
while (true) {
try t.checkCancel();
const rc = posix.system.readv(file.handle, dest.ptr, @intCast(dest.len));
switch (posix.errno(rc)) {
.SUCCESS => return @intCast(rc),
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.SRCH => return error.ProcessNotFound,
.AGAIN => return error.WouldBlock,
.BADF => return error.NotOpenForReading, // can be a race condition
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.Timeout,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset: u64) Io.File.ReadPositionalError!usize {
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (is_windows) {
const DWORD = windows.DWORD;
const OVERLAPPED = windows.OVERLAPPED;
var index: usize = 0;
var truncate: usize = 0;
var total: usize = 0;
while (true) {
try t.checkCancel();
{
const untruncated = data[index];
data[index] = untruncated[truncate..];
defer data[index] = untruncated;
const buffer = data[index..];
const want_read_count: DWORD = @min(std.math.maxInt(DWORD), buffer.len);
var n: DWORD = undefined;
var overlapped_data: OVERLAPPED = undefined;
const overlapped: ?*OVERLAPPED = if (offset) |off| blk: {
overlapped_data = .{
.Internal = 0,
.InternalHigh = 0,
.DUMMYUNIONNAME = .{
.DUMMYSTRUCTNAME = .{
.Offset = @as(u32, @truncate(off)),
.OffsetHigh = @as(u32, @truncate(off >> 32)),
},
},
.hEvent = null,
};
break :blk &overlapped_data;
} else null;
if (windows.kernel32.ReadFile(file.handle, buffer.ptr, want_read_count, &n, overlapped) == 0) {
switch (windows.GetLastError()) {
.IO_PENDING => unreachable,
.OPERATION_ABORTED => continue,
.BROKEN_PIPE => return 0,
.HANDLE_EOF => return 0,
.NETNAME_DELETED => return error.ConnectionResetByPeer,
.LOCK_VIOLATION => return error.LockViolation,
.ACCESS_DENIED => return error.AccessDenied,
.INVALID_HANDLE => return error.NotOpenForReading,
else => |err| return windows.unexpectedError(err),
}
}
total += n;
truncate += n;
}
while (index < data.len and truncate >= data[index].len) {
truncate -= data[index].len;
index += 1;
}
}
return total;
}
const have_pread_but_not_preadv = switch (native_os) {
.windows, .haiku, .serenity => true,
else => false,
};
if (have_pread_but_not_preadv) {
@compileError("TODO");
}
var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
var i: usize = 0;
for (data) |buf| {
if (iovecs_buffer.len - i == 0) break;
if (buf.len != 0) {
iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len };
i += 1;
}
}
const dest = iovecs_buffer[0..i];
assert(dest[0].len > 0);
if (native_os == .wasi and !builtin.link_libc) while (true) {
try t.checkCancel();
var nread: usize = undefined;
switch (std.os.wasi.fd_pread(file.handle, dest.ptr, dest.len, offset, &nread)) {
.SUCCESS => return nread,
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.AGAIN => |err| return errnoBug(err),
.BADF => return error.NotOpenForReading, // can be a race condition
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.Timeout,
.NXIO => return error.Unseekable,
.SPIPE => return error.Unseekable,
.OVERFLOW => return error.Unseekable,
.NOTCAPABLE => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
};
while (true) {
try t.checkCancel();
const rc = preadv_sym(file.handle, dest.ptr, @intCast(dest.len), @bitCast(offset));
switch (posix.errno(rc)) {
.SUCCESS => return @bitCast(rc),
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.SRCH => return error.ProcessNotFound,
.AGAIN => return error.WouldBlock,
.BADF => return error.NotOpenForReading, // can be a race condition
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.Timeout,
.NXIO => return error.Unseekable,
.SPIPE => return error.Unseekable,
.OVERFLOW => return error.Unseekable,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn fileSeekBy(userdata: ?*anyopaque, file: Io.File, offset: i64) Io.File.SeekError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
try t.checkCancel();
_ = file;
_ = offset;
@panic("TODO");
}
fn fileSeekTo(userdata: ?*anyopaque, file: Io.File, offset: u64) Io.File.SeekError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const fd = file.handle;
if (native_os == .linux and !builtin.link_libc and @sizeOf(usize) == 4) while (true) {
try t.checkCancel();
var result: u64 = undefined;
switch (posix.errno(posix.system.llseek(fd, offset, &result, posix.SEEK.SET))) {
.SUCCESS => return,
.INTR => continue,
.BADF => |err| return errnoBug(err), // Always a race condition.
.INVAL => return error.Unseekable,
.OVERFLOW => return error.Unseekable,
.SPIPE => return error.Unseekable,
.NXIO => return error.Unseekable,
else => |err| return posix.unexpectedErrno(err),
}
};
if (native_os == .windows) {
try t.checkCancel();
return windows.SetFilePointerEx_BEGIN(fd, offset);
}
if (native_os == .wasi and !builtin.link_libc) while (true) {
try t.checkCancel();
var new_offset: std.os.wasi.filesize_t = undefined;
switch (std.os.wasi.fd_seek(fd, @bitCast(offset), .SET, &new_offset)) {
.SUCCESS => return,
.INTR => continue,
.BADF => |err| return errnoBug(err), // Always a race condition.
.INVAL => return error.Unseekable,
.OVERFLOW => return error.Unseekable,
.SPIPE => return error.Unseekable,
.NXIO => return error.Unseekable,
.NOTCAPABLE => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
};
if (posix.SEEK == void) return error.Unseekable;
while (true) {
try t.checkCancel();
switch (posix.errno(lseek_sym(fd, @bitCast(offset), posix.SEEK.SET))) {
.SUCCESS => return,
.INTR => continue,
.BADF => |err| return errnoBug(err), // Always a race condition.
.INVAL => return error.Unseekable,
.OVERFLOW => return error.Unseekable,
.SPIPE => return error.Unseekable,
.NXIO => return error.Unseekable,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn fileWritePositional(
userdata: ?*anyopaque,
file: Io.File,
buffer: [][]const u8,
offset: u64,
) Io.File.WritePositionalError!usize {
const t: *Threaded = @ptrCast(@alignCast(userdata));
while (true) {
try t.checkCancel();
_ = file;
_ = buffer;
_ = offset;
@panic("TODO");
}
}
fn fileWriteStreaming(userdata: ?*anyopaque, file: Io.File, buffer: [][]const u8) Io.File.WriteStreamingError!usize {
const t: *Threaded = @ptrCast(@alignCast(userdata));
while (true) {
try t.checkCancel();
_ = file;
_ = buffer;
@panic("TODO");
}
}
fn nowPosix(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
const clock_id: posix.clockid_t = clockToPosix(clock);
var tp: posix.timespec = undefined;
switch (posix.errno(posix.system.clock_gettime(clock_id, &tp))) {
.SUCCESS => return timestampFromPosix(&tp),
.INVAL => return error.UnsupportedClock,
else => |err| return posix.unexpectedErrno(err),
}
}
fn nowWindows(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
switch (clock) {
.realtime => {
// RtlGetSystemTimePrecise() has a granularity of 100 nanoseconds
// and uses the NTFS/Windows epoch, which is 1601-01-01.
return .{ .nanoseconds = @as(i96, windows.ntdll.RtlGetSystemTimePrecise()) * 100 };
},
.monotonic, .uptime => {
// QPC on windows doesn't fail on >= XP/2000 and includes time suspended.
return .{ .timestamp = windows.QueryPerformanceCounter() };
},
.process_cputime_id,
.thread_cputime_id,
=> return error.UnsupportedClock,
}
}
fn nowWasi(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
var ns: std.os.wasi.timestamp_t = undefined;
const err = std.os.wasi.clock_time_get(clockToWasi(clock), 1, &ns);
if (err != .SUCCESS) return error.Unexpected;
return ns;
}
fn sleepLinux(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const clock_id: posix.clockid_t = clockToPosix(switch (timeout) {
.none => .awake,
.duration => |d| d.clock,
.deadline => |d| d.clock,
});
const deadline_nanoseconds: i96 = switch (timeout) {
.none => std.math.maxInt(i96),
.duration => |duration| duration.raw.nanoseconds,
.deadline => |deadline| deadline.raw.nanoseconds,
};
var timespec: posix.timespec = timestampToPosix(deadline_nanoseconds);
while (true) {
try t.checkCancel();
switch (std.os.linux.E.init(std.os.linux.clock_nanosleep(clock_id, .{ .ABSTIME = switch (timeout) {
.none, .duration => false,
.deadline => true,
} }, &timespec, &timespec))) {
.SUCCESS => return,
.INTR => continue,
.INVAL => return error.UnsupportedClock,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn sleepWindows(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
try t.checkCancel();
const ms = ms: {
const duration_and_clock = (try timeout.toDurationFromNow(t.io())) orelse
break :ms std.math.maxInt(windows.DWORD);
break :ms std.math.lossyCast(windows.DWORD, duration_and_clock.duration.toMilliseconds());
};
windows.kernel32.Sleep(ms);
}
fn sleepWasi(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
try t.checkCancel();
const w = std.os.wasi;
const clock: w.subscription_clock_t = if (try timeout.toDurationFromNow(t.io())) |d| .{
.id = clockToWasi(d.clock),
.timeout = std.math.lossyCast(u64, d.duration.nanoseconds),
.precision = 0,
.flags = 0,
} else .{
.id = .MONOTONIC,
.timeout = std.math.maxInt(u64),
.precision = 0,
.flags = 0,
};
const in: w.subscription_t = .{
.userdata = 0,
.u = .{
.tag = .CLOCK,
.u = .{ .clock = clock },
},
};
var event: w.event_t = undefined;
var nevents: usize = undefined;
_ = w.poll_oneoff(&in, &event, 1, &nevents);
}
fn sleepPosix(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const sec_type = @typeInfo(posix.timespec).@"struct".fields[0].type;
const nsec_type = @typeInfo(posix.timespec).@"struct".fields[1].type;
var timespec: posix.timespec = t: {
const d = (try timeout.toDurationFromNow(t.io())) orelse break :t .{
.sec = std.math.maxInt(sec_type),
.nsec = std.math.maxInt(nsec_type),
};
break :t timestampToPosix(d.raw.toNanoseconds());
};
while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.nanosleep(&timespec, &timespec))) {
.INTR => continue,
else => return, // This prong handles success as well as unexpected errors.
}
}
}
fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize {
const t: *Threaded = @ptrCast(@alignCast(userdata));
var reset_event: ResetEvent = .unset;
for (futures, 0..) |future, i| {
const closure: *AsyncClosure = @ptrCast(@alignCast(future));
if (@atomicRmw(?*ResetEvent, &closure.select_condition, .Xchg, &reset_event, .seq_cst) == AsyncClosure.done_reset_event) {
for (futures[0..i]) |cleanup_future| {
const cleanup_closure: *AsyncClosure = @ptrCast(@alignCast(cleanup_future));
if (@atomicRmw(?*ResetEvent, &cleanup_closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) {
cleanup_closure.reset_event.waitUncancelable(); // Ensure no reference to our stack-allocated reset_event.
}
}
return i;
}
}
try reset_event.wait(t);
var result: ?usize = null;
for (futures, 0..) |future, i| {
const closure: *AsyncClosure = @ptrCast(@alignCast(future));
if (@atomicRmw(?*ResetEvent, &closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) {
closure.reset_event.waitUncancelable(); // Ensure no reference to our stack-allocated reset_event.
if (result == null) result = i; // In case multiple are ready, return first.
}
}
return result.?;
}
fn netListenIpPosix(
userdata: ?*anyopaque,
address: IpAddress,
options: IpAddress.ListenOptions,
) IpAddress.ListenError!net.Server {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const family = posixAddressFamily(&address);
const socket_fd = try openSocketPosix(t, family, .{
.mode = options.mode,
.protocol = options.protocol,
});
errdefer posix.close(socket_fd);
if (options.reuse_address) {
try setSocketOption(t, socket_fd, posix.SOL.SOCKET, posix.SO.REUSEADDR, 1);
if (@hasDecl(posix.SO, "REUSEPORT"))
try setSocketOption(t, socket_fd, posix.SOL.SOCKET, posix.SO.REUSEPORT, 1);
}
var storage: PosixAddress = undefined;
var addr_len = addressToPosix(&address, &storage);
try posixBind(t, socket_fd, &storage.any, addr_len);
while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.listen(socket_fd, options.kernel_backlog))) {
.SUCCESS => break,
.ADDRINUSE => return error.AddressInUse,
.BADF => |err| return errnoBug(err),
else => |err| return posix.unexpectedErrno(err),
}
}
try posixGetSockName(t, socket_fd, &storage.any, &addr_len);
return .{
.socket = .{
.handle = socket_fd,
.address = addressFromPosix(&storage),
},
};
}
fn netListenUnix(
userdata: ?*anyopaque,
address: *const net.UnixAddress,
options: net.UnixAddress.ListenOptions,
) net.UnixAddress.ListenError!net.Socket.Handle {
if (!net.has_unix_sockets) return error.AddressFamilyUnsupported;
const t: *Threaded = @ptrCast(@alignCast(userdata));
const socket_fd = openSocketPosix(t, posix.AF.UNIX, .{ .mode = .stream }) catch |err| switch (err) {
error.ProtocolUnsupportedBySystem => return error.AddressFamilyUnsupported,
error.ProtocolUnsupportedByAddressFamily => return error.AddressFamilyUnsupported,
error.SocketModeUnsupported => return error.AddressFamilyUnsupported,
error.OptionUnsupported => return error.Unexpected,
else => |e| return e,
};
errdefer posix.close(socket_fd);
var storage: UnixAddress = undefined;
const addr_len = addressUnixToPosix(address, &storage);
try posixBindUnix(t, socket_fd, &storage.any, addr_len);
while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.listen(socket_fd, options.kernel_backlog))) {
.SUCCESS => break,
.ADDRINUSE => return error.AddressInUse,
.BADF => |err| return errnoBug(err),
else => |err| return posix.unexpectedErrno(err),
}
}
return socket_fd;
}
fn posixBindUnix(t: *Threaded, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.bind(fd, addr, addr_len))) {
.SUCCESS => break,
.INTR => continue,
.ACCES => return error.AccessDenied,
.ADDRINUSE => return error.AddressInUse,
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
.ADDRNOTAVAIL => return error.AddressUnavailable,
.NOMEM => return error.SystemResources,
.LOOP => return error.SymLinkLoop,
.NOENT => return error.FileNotFound,
.NOTDIR => return error.NotDir,
.ROFS => return error.ReadOnlyFileSystem,
.PERM => return error.PermissionDenied,
.BADF => |err| return errnoBug(err), // always a race condition if this error is returned
.INVAL => |err| return errnoBug(err), // invalid parameters
.NOTSOCK => |err| return errnoBug(err), // invalid `sockfd`
.FAULT => |err| return errnoBug(err), // invalid `addr` pointer
.NAMETOOLONG => |err| return errnoBug(err),
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn posixBind(t: *Threaded, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.bind(socket_fd, addr, addr_len))) {
.SUCCESS => break,
.INTR => continue,
.ADDRINUSE => return error.AddressInUse,
.BADF => |err| return errnoBug(err), // always a race condition if this error is returned
.INVAL => |err| return errnoBug(err), // invalid parameters
.NOTSOCK => |err| return errnoBug(err), // invalid `sockfd`
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
.ADDRNOTAVAIL => return error.AddressUnavailable,
.FAULT => |err| return errnoBug(err), // invalid `addr` pointer
.NOMEM => return error.SystemResources,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn posixConnect(t: *Threaded, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.connect(socket_fd, addr, addr_len))) {
.SUCCESS => return,
.INTR => continue,
.ADDRNOTAVAIL => return error.AddressUnavailable,
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
.AGAIN, .INPROGRESS => return error.WouldBlock,
.ALREADY => return error.ConnectionPending,
.BADF => |err| return errnoBug(err),
.CONNREFUSED => return error.ConnectionRefused,
.CONNRESET => return error.ConnectionResetByPeer,
.FAULT => |err| return errnoBug(err),
.ISCONN => |err| return errnoBug(err),
.HOSTUNREACH => return error.HostUnreachable,
.NETUNREACH => return error.NetworkUnreachable,
.NOTSOCK => |err| return errnoBug(err),
.PROTOTYPE => |err| return errnoBug(err),
.TIMEDOUT => return error.Timeout,
.CONNABORTED => |err| return errnoBug(err),
.ACCES => return error.AccessDenied,
.PERM => |err| return errnoBug(err),
.NOENT => |err| return errnoBug(err),
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn posixConnectUnix(t: *Threaded, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.connect(fd, addr, addr_len))) {
.SUCCESS => return,
.INTR => continue,
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
.AGAIN => return error.WouldBlock,
.INPROGRESS => return error.WouldBlock,
.ACCES => return error.AccessDenied,
.LOOP => return error.SymLinkLoop,
.NOENT => return error.FileNotFound,
.NOTDIR => return error.NotDir,
.ROFS => return error.ReadOnlyFileSystem,
.PERM => return error.PermissionDenied,
.BADF => |err| return errnoBug(err),
.CONNABORTED => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.ISCONN => |err| return errnoBug(err),
.NOTSOCK => |err| return errnoBug(err),
.PROTOTYPE => |err| return errnoBug(err),
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn posixGetSockName(t: *Threaded, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void {
while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.getsockname(socket_fd, addr, addr_len))) {
.SUCCESS => break,
.INTR => continue,
.BADF => |err| return errnoBug(err), // always a race condition
.FAULT => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err), // invalid parameters
.NOTSOCK => |err| return errnoBug(err), // always a race condition
.NOBUFS => return error.SystemResources,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn setSocketOption(t: *Threaded, fd: posix.fd_t, level: i32, opt_name: u32, option: u32) !void {
const o: []const u8 = @ptrCast(&option);
while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.setsockopt(fd, level, opt_name, o.ptr, @intCast(o.len)))) {
.SUCCESS => return,
.INTR => continue,
.BADF => |err| return errnoBug(err), // always a race condition
.NOTSOCK => |err| return errnoBug(err), // always a race condition
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn netConnectIpPosix(
userdata: ?*anyopaque,
address: *const IpAddress,
options: IpAddress.ConnectOptions,
) IpAddress.ConnectError!net.Stream {
if (options.timeout != .none) @panic("TODO");
const t: *Threaded = @ptrCast(@alignCast(userdata));
const family = posixAddressFamily(address);
const socket_fd = try openSocketPosix(t, family, .{
.mode = options.mode,
.protocol = options.protocol,
});
errdefer posix.close(socket_fd);
var storage: PosixAddress = undefined;
var addr_len = addressToPosix(address, &storage);
try posixConnect(t, socket_fd, &storage.any, addr_len);
try posixGetSockName(t, socket_fd, &storage.any, &addr_len);
return .{ .socket = .{
.handle = socket_fd,
.address = addressFromPosix(&storage),
} };
}
fn netConnectUnix(
userdata: ?*anyopaque,
address: *const net.UnixAddress,
) net.UnixAddress.ConnectError!net.Socket.Handle {
if (!net.has_unix_sockets) return error.AddressFamilyUnsupported;
const t: *Threaded = @ptrCast(@alignCast(userdata));
const socket_fd = openSocketPosix(t, posix.AF.UNIX, .{ .mode = .stream }) catch |err| switch (err) {
error.OptionUnsupported => return error.Unexpected,
else => |e| return e,
};
errdefer posix.close(socket_fd);
var storage: UnixAddress = undefined;
const addr_len = addressUnixToPosix(address, &storage);
try posixConnectUnix(t, socket_fd, &storage.any, addr_len);
return socket_fd;
}
fn netBindIpPosix(
userdata: ?*anyopaque,
address: *const IpAddress,
options: IpAddress.BindOptions,
) IpAddress.BindError!net.Socket {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const family = posixAddressFamily(address);
const socket_fd = try openSocketPosix(t, family, options);
errdefer posix.close(socket_fd);
var storage: PosixAddress = undefined;
var addr_len = addressToPosix(address, &storage);
try posixBind(t, socket_fd, &storage.any, addr_len);
try posixGetSockName(t, socket_fd, &storage.any, &addr_len);
return .{
.handle = socket_fd,
.address = addressFromPosix(&storage),
};
}
fn openSocketPosix(
t: *Threaded,
family: posix.sa_family_t,
options: IpAddress.BindOptions,
) error{
AddressFamilyUnsupported,
ProtocolUnsupportedBySystem,
ProcessFdQuotaExceeded,
SystemFdQuotaExceeded,
SystemResources,
ProtocolUnsupportedByAddressFamily,
SocketModeUnsupported,
OptionUnsupported,
Unexpected,
Canceled,
}!posix.socket_t {
const mode = posixSocketMode(options.mode);
const protocol = posixProtocol(options.protocol);
const socket_fd = while (true) {
try t.checkCancel();
const flags: u32 = mode | if (socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC;
const socket_rc = posix.system.socket(family, flags, protocol);
switch (posix.errno(socket_rc)) {
.SUCCESS => {
const fd: posix.fd_t = @intCast(socket_rc);
errdefer posix.close(fd);
if (socket_flags_unsupported) while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
.SUCCESS => break,
.INTR => continue,
else => |err| return posix.unexpectedErrno(err),
}
};
break fd;
},
.INTR => continue,
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
.INVAL => return error.ProtocolUnsupportedBySystem,
.MFILE => return error.ProcessFdQuotaExceeded,
.NFILE => return error.SystemFdQuotaExceeded,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.PROTONOSUPPORT => return error.ProtocolUnsupportedByAddressFamily,
.PROTOTYPE => return error.SocketModeUnsupported,
else => |err| return posix.unexpectedErrno(err),
}
};
errdefer posix.close(socket_fd);
if (options.ip6_only) {
if (posix.IPV6 == void) return error.OptionUnsupported;
try setSocketOption(t, socket_fd, posix.IPPROTO.IPV6, posix.IPV6.V6ONLY, 0);
}
return socket_fd;
}
const socket_flags_unsupported = builtin.os.tag.isDarwin() or native_os == .haiku; // 💩💩
const have_accept4 = !socket_flags_unsupported;
fn netAcceptPosix(userdata: ?*anyopaque, listen_fd: net.Socket.Handle) net.Server.AcceptError!net.Stream {
const t: *Threaded = @ptrCast(@alignCast(userdata));
var storage: PosixAddress = undefined;
var addr_len: posix.socklen_t = @sizeOf(PosixAddress);
const fd = while (true) {
try t.checkCancel();
const rc = if (have_accept4)
posix.system.accept4(listen_fd, &storage.any, &addr_len, posix.SOCK.CLOEXEC)
else
posix.system.accept(listen_fd, &storage.any, &addr_len);
switch (posix.errno(rc)) {
.SUCCESS => {
const fd: posix.fd_t = @intCast(rc);
errdefer posix.close(fd);
if (!have_accept4) while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
.SUCCESS => break,
.INTR => continue,
else => |err| return posix.unexpectedErrno(err),
}
};
break fd;
},
.INTR => continue,
.AGAIN => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err), // always a race condition
.CONNABORTED => return error.ConnectionAborted,
.FAULT => |err| return errnoBug(err),
.INVAL => return error.SocketNotListening,
.NOTSOCK => |err| return errnoBug(err),
.MFILE => return error.ProcessFdQuotaExceeded,
.NFILE => return error.SystemFdQuotaExceeded,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.OPNOTSUPP => |err| return errnoBug(err),
.PROTO => return error.ProtocolFailure,
.PERM => return error.BlockedByFirewall,
else => |err| return posix.unexpectedErrno(err),
}
};
return .{ .socket = .{
.handle = fd,
.address = addressFromPosix(&storage),
} };
}
fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
const t: *Threaded = @ptrCast(@alignCast(userdata));
var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
var i: usize = 0;
for (data) |buf| {
if (iovecs_buffer.len - i == 0) break;
if (buf.len != 0) {
iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len };
i += 1;
}
}
const dest = iovecs_buffer[0..i];
assert(dest[0].len > 0);
if (native_os == .wasi and !builtin.link_libc) while (true) {
try t.checkCancel();
var n: usize = undefined;
switch (std.os.wasi.fd_read(fd, dest.ptr, dest.len, &n)) {
.SUCCESS => return n,
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.AGAIN => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err),
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.Timeout,
.NOTCAPABLE => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
};
while (true) {
try t.checkCancel();
const rc = posix.system.readv(fd, dest.ptr, @intCast(dest.len));
switch (posix.errno(rc)) {
.SUCCESS => return @intCast(rc),
.INTR => continue,
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.AGAIN => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err), // Always a race condition.
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.Timeout,
.PIPE => return error.SocketUnconnected,
.NETDOWN => return error.NetworkDown,
else => |err| return posix.unexpectedErrno(err),
}
}
}
const have_sendmmsg = builtin.os.tag == .linux;
fn netSend(
userdata: ?*anyopaque,
handle: net.Socket.Handle,
messages: []net.OutgoingMessage,
flags: net.SendFlags,
) struct { ?net.Socket.SendError, usize } {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const posix_flags: u32 =
@as(u32, if (@hasDecl(posix.MSG, "CONFIRM") and flags.confirm) posix.MSG.CONFIRM else 0) |
@as(u32, if (flags.dont_route) posix.MSG.DONTROUTE else 0) |
@as(u32, if (flags.eor) posix.MSG.EOR else 0) |
@as(u32, if (flags.oob) posix.MSG.OOB else 0) |
@as(u32, if (@hasDecl(posix.MSG, "FASTOPEN") and flags.fastopen) posix.MSG.FASTOPEN else 0) |
posix.MSG.NOSIGNAL;
var i: usize = 0;
while (messages.len - i != 0) {
if (have_sendmmsg) {
i += netSendMany(t, handle, messages[i..], posix_flags) catch |err| return .{ err, i };
continue;
}
netSendOne(t, handle, &messages[i], posix_flags) catch |err| return .{ err, i };
i += 1;
}
return .{ null, i };
}
fn netSendOne(
t: *Threaded,
handle: net.Socket.Handle,
message: *net.OutgoingMessage,
flags: u32,
) net.Socket.SendError!void {
var addr: PosixAddress = undefined;
var iovec: posix.iovec_const = .{ .base = @constCast(message.data_ptr), .len = message.data_len };
const msg: posix.msghdr_const = .{
.name = &addr.any,
.namelen = addressToPosix(message.address, &addr),
.iov = (&iovec)[0..1],
.iovlen = 1,
.control = @constCast(message.control.ptr),
.controllen = @intCast(message.control.len),
.flags = 0,
};
while (true) {
try t.checkCancel();
const rc = posix.system.sendmsg(handle, &msg, flags);
if (is_windows) {
if (rc == windows.ws2_32.SOCKET_ERROR) {
switch (windows.ws2_32.WSAGetLastError()) {
.WSAEACCES => return error.AccessDenied,
.WSAEADDRNOTAVAIL => return error.AddressNotAvailable,
.WSAECONNRESET => return error.ConnectionResetByPeer,
.WSAEMSGSIZE => return error.MessageOversize,
.WSAENOBUFS => return error.SystemResources,
.WSAENOTSOCK => return error.FileDescriptorNotASocket,
.WSAEAFNOSUPPORT => return error.AddressFamilyUnsupported,
.WSAEDESTADDRREQ => unreachable, // A destination address is required.
.WSAEFAULT => unreachable, // The lpBuffers, lpTo, lpOverlapped, lpNumberOfBytesSent, or lpCompletionRoutine parameters are not part of the user address space, or the lpTo parameter is too small.
.WSAEHOSTUNREACH => return error.NetworkUnreachable,
.WSAEINVAL => unreachable,
.WSAENETDOWN => return error.NetworkDown,
.WSAENETRESET => return error.ConnectionResetByPeer,
.WSAENETUNREACH => return error.NetworkUnreachable,
.WSAENOTCONN => return error.SocketUnconnected,
.WSAESHUTDOWN => unreachable, // The socket has been shut down; it is not possible to WSASendTo on a socket after shutdown has been invoked with how set to SD_SEND or SD_BOTH.
.WSANOTINITIALISED => unreachable, // A successful WSAStartup call must occur before using this function.
else => |err| return windows.unexpectedWSAError(err),
}
} else {
message.data_len = @intCast(rc);
return;
}
}
switch (posix.errno(rc)) {
.SUCCESS => {
message.data_len = @intCast(rc);
return;
},
.INTR => continue,
.ACCES => return error.AccessDenied,
.ALREADY => return error.FastOpenAlreadyInProgress,
.BADF => |err| return errnoBug(err),
.CONNRESET => return error.ConnectionResetByPeer,
.DESTADDRREQ => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.INVAL => |err| return errnoBug(err),
.ISCONN => |err| return errnoBug(err),
.MSGSIZE => return error.MessageOversize,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTSOCK => |err| return errnoBug(err),
.OPNOTSUPP => |err| return errnoBug(err),
.PIPE => return error.SocketUnconnected,
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
.HOSTUNREACH => return error.HostUnreachable,
.NETUNREACH => return error.NetworkUnreachable,
.NOTCONN => return error.SocketUnconnected,
.NETDOWN => return error.NetworkDown,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn netSendMany(
t: *Threaded,
handle: net.Socket.Handle,
messages: []net.OutgoingMessage,
flags: u32,
) net.Socket.SendError!usize {
var msg_buffer: [64]std.os.linux.mmsghdr = undefined;
var addr_buffer: [msg_buffer.len]PosixAddress = undefined;
var iovecs_buffer: [msg_buffer.len]posix.iovec = undefined;
const min_len: usize = @min(messages.len, msg_buffer.len);
const clamped_messages = messages[0..min_len];
const clamped_msgs = (&msg_buffer)[0..min_len];
const clamped_addrs = (&addr_buffer)[0..min_len];
const clamped_iovecs = (&iovecs_buffer)[0..min_len];
for (clamped_messages, clamped_msgs, clamped_addrs, clamped_iovecs) |*message, *msg, *addr, *iovec| {
iovec.* = .{ .base = @constCast(message.data_ptr), .len = message.data_len };
msg.* = .{
.hdr = .{
.name = &addr.any,
.namelen = addressToPosix(message.address, addr),
.iov = iovec[0..1],
.iovlen = 1,
.control = @constCast(message.control.ptr),
.controllen = message.control.len,
.flags = 0,
},
.len = undefined, // Populated by calling sendmmsg below.
};
}
while (true) {
try t.checkCancel();
const rc = posix.system.sendmmsg(handle, clamped_msgs.ptr, @intCast(clamped_msgs.len), flags);
switch (posix.errno(rc)) {
.SUCCESS => {
const n: usize = @intCast(rc);
for (clamped_messages[0..n], clamped_msgs[0..n]) |*message, *msg| {
message.data_len = msg.len;
}
return n;
},
.AGAIN => |err| return errnoBug(err),
.ALREADY => return error.FastOpenAlreadyInProgress,
.BADF => |err| return errnoBug(err), // Always a race condition.
.CONNRESET => return error.ConnectionResetByPeer,
.DESTADDRREQ => |err| return errnoBug(err), // The socket is not connection-mode, and no peer address is set.
.FAULT => |err| return errnoBug(err), // An invalid user space address was specified for an argument.
.INTR => continue,
.INVAL => |err| return errnoBug(err), // Invalid argument passed.
.ISCONN => |err| return errnoBug(err), // connection-mode socket was connected already but a recipient was specified
.MSGSIZE => return error.MessageOversize,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTSOCK => |err| return errnoBug(err), // The file descriptor sockfd does not refer to a socket.
.OPNOTSUPP => |err| return errnoBug(err), // Some bit in the flags argument is inappropriate for the socket type.
.PIPE => return error.SocketUnconnected,
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
.HOSTUNREACH => return error.NetworkUnreachable,
.NETUNREACH => return error.NetworkUnreachable,
.NOTCONN => return error.SocketUnconnected,
.NETDOWN => return error.NetworkDown,
else => |err| return posix.unexpectedErrno(err),
}
}
}
fn netReceive(
userdata: ?*anyopaque,
handle: net.Socket.Handle,
message_buffer: []net.IncomingMessage,
data_buffer: []u8,
flags: net.ReceiveFlags,
timeout: Io.Timeout,
) struct { ?net.Socket.ReceiveTimeoutError, usize } {
const t: *Threaded = @ptrCast(@alignCast(userdata));
// recvmmsg is useless, here's why:
// * [timeout bug](https://bugzilla.kernel.org/show_bug.cgi?id=75371)
// * it wants iovecs for each message but we have a better API: one data
// buffer to handle all the messages. The better API cannot be lowered to
// the split vectors though because reducing the buffer size might make
// some messages unreceivable.
// So the strategy instead is to use non-blocking recvmsg calls, calling
// poll() with timeout if the first one returns EAGAIN.
const posix_flags: u32 =
@as(u32, if (flags.oob) posix.MSG.OOB else 0) |
@as(u32, if (flags.peek) posix.MSG.PEEK else 0) |
@as(u32, if (flags.trunc) posix.MSG.TRUNC else 0) |
posix.MSG.DONTWAIT | posix.MSG.NOSIGNAL;
var poll_fds: [1]posix.pollfd = .{
.{
.fd = handle,
.events = posix.POLL.IN,
.revents = undefined,
},
};
var message_i: usize = 0;
var data_i: usize = 0;
const deadline = timeout.toDeadline(t.io()) catch |err| return .{ err, message_i };
recv: while (true) {
t.checkCancel() catch |err| return .{ err, message_i };
if (message_buffer.len - message_i == 0) return .{ null, message_i };
const message = &message_buffer[message_i];
const remaining_data_buffer = data_buffer[data_i..];
var storage: PosixAddress = undefined;
var iov: posix.iovec = .{ .base = remaining_data_buffer.ptr, .len = remaining_data_buffer.len };
var msg: posix.msghdr = .{
.name = &storage.any,
.namelen = @sizeOf(PosixAddress),
.iov = (&iov)[0..1],
.iovlen = 1,
.control = message.control.ptr,
.controllen = @intCast(message.control.len),
.flags = undefined,
};
const recv_rc = posix.system.recvmsg(handle, &msg, posix_flags);
switch (posix.errno(recv_rc)) {
.SUCCESS => {
const data = remaining_data_buffer[0..@intCast(recv_rc)];
data_i += data.len;
message.* = .{
.from = addressFromPosix(&storage),
.data = data,
.control = if (msg.control) |ptr| @as([*]u8, @ptrCast(ptr))[0..msg.controllen] else message.control,
.flags = .{
.eor = (msg.flags & posix.MSG.EOR) != 0,
.trunc = (msg.flags & posix.MSG.TRUNC) != 0,
.ctrunc = (msg.flags & posix.MSG.CTRUNC) != 0,
.oob = (msg.flags & posix.MSG.OOB) != 0,
.errqueue = if (@hasDecl(posix.MSG, "ERRQUEUE")) (msg.flags & posix.MSG.ERRQUEUE) != 0 else false,
},
};
message_i += 1;
continue;
},
.AGAIN => while (true) {
t.checkCancel() catch |err| return .{ err, message_i };
if (message_i != 0) return .{ null, message_i };
const max_poll_ms = std.math.maxInt(u31);
const timeout_ms: u31 = if (deadline) |d| t: {
const duration = d.durationFromNow(t.io()) catch |err| return .{ err, message_i };
if (duration.raw.nanoseconds <= 0) return .{ error.Timeout, message_i };
break :t @intCast(@min(max_poll_ms, duration.raw.toMilliseconds()));
} else max_poll_ms;
const poll_rc = posix.system.poll(&poll_fds, poll_fds.len, timeout_ms);
switch (posix.errno(poll_rc)) {
.SUCCESS => {
if (poll_rc == 0) {
// Although spurious timeouts are OK, when no deadline
// is passed we must not return `error.Timeout`.
if (deadline == null) continue;
return .{ error.Timeout, message_i };
}
continue :recv;
},
.INTR => continue,
.FAULT => |err| return .{ errnoBug(err), message_i },
.INVAL => |err| return .{ errnoBug(err), message_i },
.NOMEM => return .{ error.SystemResources, message_i },
else => |err| return .{ posix.unexpectedErrno(err), message_i },
}
},
.INTR => continue,
.BADF => |err| return .{ errnoBug(err), message_i },
.NFILE => return .{ error.SystemFdQuotaExceeded, message_i },
.MFILE => return .{ error.ProcessFdQuotaExceeded, message_i },
.FAULT => |err| return .{ errnoBug(err), message_i },
.INVAL => |err| return .{ errnoBug(err), message_i },
.NOBUFS => return .{ error.SystemResources, message_i },
.NOMEM => return .{ error.SystemResources, message_i },
.NOTCONN => return .{ error.SocketUnconnected, message_i },
.NOTSOCK => |err| return .{ errnoBug(err), message_i },
.MSGSIZE => return .{ error.MessageOversize, message_i },
.PIPE => return .{ error.SocketUnconnected, message_i },
.OPNOTSUPP => |err| return .{ errnoBug(err), message_i },
.CONNRESET => return .{ error.ConnectionResetByPeer, message_i },
.NETDOWN => return .{ error.NetworkDown, message_i },
else => |err| return .{ posix.unexpectedErrno(err), message_i },
}
}
}
fn netWritePosix(
userdata: ?*anyopaque,
fd: net.Socket.Handle,
header: []const u8,
data: []const []const u8,
splat: usize,
) net.Stream.Writer.Error!usize {
const t: *Threaded = @ptrCast(@alignCast(userdata));
try t.checkCancel();
var iovecs: [max_iovecs_len]posix.iovec_const = undefined;
var msg: posix.msghdr_const = .{
.name = null,
.namelen = 0,
.iov = &iovecs,
.iovlen = 0,
.control = null,
.controllen = 0,
.flags = 0,
};
addBuf(&iovecs, &msg.iovlen, header);
for (data[0 .. data.len - 1]) |bytes| addBuf(&iovecs, &msg.iovlen, bytes);
const pattern = data[data.len - 1];
if (iovecs.len - msg.iovlen != 0) switch (splat) {
0 => {},
1 => addBuf(&iovecs, &msg.iovlen, pattern),
else => switch (pattern.len) {
0 => {},
1 => {
var backup_buffer: [splat_buffer_size]u8 = undefined;
const splat_buffer = &backup_buffer;
const memset_len = @min(splat_buffer.len, splat);
const buf = splat_buffer[0..memset_len];
@memset(buf, pattern[0]);
addBuf(&iovecs, &msg.iovlen, buf);
var remaining_splat = splat - buf.len;
while (remaining_splat > splat_buffer.len and iovecs.len - msg.iovlen != 0) {
assert(buf.len == splat_buffer.len);
addBuf(&iovecs, &msg.iovlen, splat_buffer);
remaining_splat -= splat_buffer.len;
}
addBuf(&iovecs, &msg.iovlen, splat_buffer[0..remaining_splat]);
},
else => for (0..@min(splat, iovecs.len - msg.iovlen)) |_| {
addBuf(&iovecs, &msg.iovlen, pattern);
},
},
};
const flags = posix.MSG.NOSIGNAL;
return posix.sendmsg(fd, &msg, flags);
}
fn addBuf(v: []posix.iovec_const, i: *@FieldType(posix.msghdr_const, "iovlen"), bytes: []const u8) void {
// OS checks ptr addr before length so zero length vectors must be omitted.
if (bytes.len == 0) return;
if (v.len - i.* == 0) return;
v[i.*] = .{ .base = bytes.ptr, .len = bytes.len };
i.* += 1;
}
fn netClose(userdata: ?*anyopaque, handle: net.Socket.Handle) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
switch (native_os) {
.windows => windows.closesocket(handle) catch recoverableOsBugDetected(),
else => posix.close(handle),
}
}
fn netInterfaceNameResolve(
userdata: ?*anyopaque,
name: *const net.Interface.Name,
) net.Interface.Name.ResolveError!net.Interface {
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (native_os == .linux) {
const sock_fd = openSocketPosix(t, posix.AF.UNIX, .{ .mode = .dgram }) catch |err| switch (err) {
error.ProcessFdQuotaExceeded => return error.SystemResources,
error.SystemFdQuotaExceeded => return error.SystemResources,
error.AddressFamilyUnsupported => return error.Unexpected,
error.ProtocolUnsupportedBySystem => return error.Unexpected,
error.ProtocolUnsupportedByAddressFamily => return error.Unexpected,
error.SocketModeUnsupported => return error.Unexpected,
error.OptionUnsupported => return error.Unexpected,
else => |e| return e,
};
defer posix.close(sock_fd);
var ifr: posix.ifreq = .{
.ifrn = .{ .name = @bitCast(name.bytes) },
.ifru = undefined,
};
while (true) {
try t.checkCancel();
switch (posix.errno(posix.system.ioctl(sock_fd, posix.SIOCGIFINDEX, @intFromPtr(&ifr)))) {
.SUCCESS => return .{ .index = @bitCast(ifr.ifru.ivalue) },
.INTR => continue,
.INVAL => |err| return errnoBug(err), // Bad parameters.
.NOTTY => |err| return errnoBug(err),
.NXIO => |err| return errnoBug(err),
.BADF => |err| return errnoBug(err), // Always a race condition.
.FAULT => |err| return errnoBug(err), // Bad pointer parameter.
.IO => |err| return errnoBug(err), // sock_fd is not a file descriptor
.NODEV => return error.InterfaceNotFound,
else => |err| return posix.unexpectedErrno(err),
}
}
}
if (native_os == .windows) {
try t.checkCancel();
const index = std.os.windows.ws2_32.if_nametoindex(&name.bytes);
if (index == 0) return error.InterfaceNotFound;
return .{ .index = index };
}
if (builtin.link_libc) {
try t.checkCancel();
const index = std.c.if_nametoindex(&name.bytes);
if (index == 0) return error.InterfaceNotFound;
return .{ .index = @bitCast(index) };
}
@panic("unimplemented");
}
fn netInterfaceName(userdata: ?*anyopaque, interface: net.Interface) net.Interface.NameError!net.Interface.Name {
const t: *Threaded = @ptrCast(@alignCast(userdata));
try t.checkCancel();
if (native_os == .linux) {
_ = interface;
@panic("TODO");
}
if (native_os == .windows) {
@panic("TODO");
}
if (builtin.link_libc) {
@panic("TODO");
}
@panic("unimplemented");
}
fn netLookup(
userdata: ?*anyopaque,
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const t_io = t.io();
resolved.putOneUncancelable(t_io, .{ .end = netLookupFallible(t, host_name, resolved, options) });
}
fn netLookupFallible(
t: *Threaded,
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
) !void {
const t_io = t.io();
const name = host_name.bytes;
assert(name.len <= HostName.max_len);
if (is_windows) {
// TODO use GetAddrInfoExW / GetAddrInfoExCancel
@compileError("TODO");
}
// On Linux, glibc provides getaddrinfo_a which is capable of supporting our semantics.
// However, musl's POSIX-compliant getaddrinfo is not, so we bypass it.
if (builtin.target.isGnuLibC()) {
// TODO use getaddrinfo_a / gai_cancel
}
if (native_os == .linux) {
if (options.family != .ip4) {
if (IpAddress.parseIp6(name, options.port)) |addr| {
try resolved.putAll(t_io, &.{
.{ .address = addr },
.{ .canonical_name = copyCanon(options.canonical_name_buffer, name) },
});
return;
} else |_| {}
}
if (options.family != .ip6) {
if (IpAddress.parseIp4(name, options.port)) |addr| {
try resolved.putAll(t_io, &.{
.{ .address = addr },
.{ .canonical_name = copyCanon(options.canonical_name_buffer, name) },
});
return;
} else |_| {}
}
lookupHosts(t, host_name, resolved, options) catch |err| switch (err) {
error.UnknownHostName => {},
else => |e| return e,
};
// RFC 6761 Section 6.3.3
// Name resolution APIs and libraries SHOULD recognize
// localhost names as special and SHOULD always return the IP
// loopback address for address queries and negative responses
// for all other query types.
// Check for equal to "localhost(.)" or ends in ".localhost(.)"
const localhost = if (name[name.len - 1] == '.') "localhost." else "localhost";
if (std.mem.endsWith(u8, name, localhost) and
(name.len == localhost.len or name[name.len - localhost.len] == '.'))
{
var results_buffer: [3]HostName.LookupResult = undefined;
var results_index: usize = 0;
if (options.family != .ip4) {
results_buffer[results_index] = .{ .address = .{ .ip6 = .loopback(options.port) } };
results_index += 1;
}
if (options.family != .ip6) {
results_buffer[results_index] = .{ .address = .{ .ip4 = .loopback(options.port) } };
results_index += 1;
}
const canon_name = "localhost";
const canon_name_dest = options.canonical_name_buffer[0..canon_name.len];
canon_name_dest.* = canon_name.*;
results_buffer[results_index] = .{ .canonical_name = .{ .bytes = canon_name_dest } };
results_index += 1;
try resolved.putAll(t_io, results_buffer[0..results_index]);
return;
}
return lookupDnsSearch(t, host_name, resolved, options);
}
if (native_os == .openbsd) {
// TODO use getaddrinfo_async / asr_abort
}
if (native_os == .freebsd) {
// TODO use dnsres_getaddrinfo
}
if (native_os.isDarwin()) {
// TODO use CFHostStartInfoResolution / CFHostCancelInfoResolution
}
if (builtin.link_libc) {
// This operating system lacks a way to resolve asynchronously. We are
// stuck with getaddrinfo.
var name_buffer: [HostName.max_len + 1]u8 = undefined;
@memcpy(name_buffer[0..host_name.bytes.len], host_name.bytes);
name_buffer[host_name.bytes.len] = 0;
const name_c = name_buffer[0..host_name.bytes.len :0];
var port_buffer: [8]u8 = undefined;
const port_c = std.fmt.bufPrintZ(&port_buffer, "{d}", .{options.port}) catch unreachable;
const hints: posix.addrinfo = .{
.flags = .{ .NUMERICSERV = true },
.family = posix.AF.UNSPEC,
.socktype = posix.SOCK.STREAM,
.protocol = posix.IPPROTO.TCP,
.canonname = null,
.addr = null,
.addrlen = 0,
.next = null,
};
var res: ?*posix.addrinfo = null;
while (true) {
try t.checkCancel();
switch (posix.system.getaddrinfo(name_c.ptr, port_c.ptr, &hints, &res)) {
@as(posix.system.EAI, @enumFromInt(0)) => break,
.ADDRFAMILY => return error.AddressFamilyUnsupported,
.AGAIN => return error.NameServerFailure,
.FAIL => return error.NameServerFailure,
.FAMILY => return error.AddressFamilyUnsupported,
.MEMORY => return error.SystemResources,
.NODATA => return error.UnknownHostName,
.NONAME => return error.UnknownHostName,
.SYSTEM => switch (posix.errno(-1)) {
.INTR => continue,
else => |e| return posix.unexpectedErrno(e),
},
else => return error.Unexpected,
}
}
defer if (res) |some| posix.system.freeaddrinfo(some);
var it = res;
var canon_name: ?[*:0]const u8 = null;
while (it) |info| : (it = info.next) {
const addr = info.addr orelse continue;
const storage: PosixAddress = .{ .any = addr.* };
try resolved.putOne(t_io, .{ .address = addressFromPosix(&storage) });
if (info.canonname) |n| {
if (canon_name == null) {
canon_name = n;
}
}
}
if (canon_name) |n| {
try resolved.putOne(t_io, .{
.canonical_name = copyCanon(options.canonical_name_buffer, std.mem.sliceTo(n, 0)),
});
}
return;
}
return error.OptionUnsupported;
}
const PosixAddress = extern union {
any: posix.sockaddr,
in: posix.sockaddr.in,
in6: posix.sockaddr.in6,
};
const UnixAddress = extern union {
any: posix.sockaddr,
un: posix.sockaddr.un,
};
fn posixAddressFamily(a: *const IpAddress) posix.sa_family_t {
return switch (a.*) {
.ip4 => posix.AF.INET,
.ip6 => posix.AF.INET6,
};
}
fn addressFromPosix(posix_address: *const PosixAddress) IpAddress {
return switch (posix_address.any.family) {
posix.AF.INET => .{ .ip4 = address4FromPosix(&posix_address.in) },
posix.AF.INET6 => .{ .ip6 = address6FromPosix(&posix_address.in6) },
else => .{ .ip4 = .loopback(0) },
};
}
fn addressToPosix(a: *const IpAddress, storage: *PosixAddress) posix.socklen_t {
return switch (a.*) {
.ip4 => |ip4| {
storage.in = address4ToPosix(ip4);
return @sizeOf(posix.sockaddr.in);
},
.ip6 => |*ip6| {
storage.in6 = address6ToPosix(ip6);
return @sizeOf(posix.sockaddr.in6);
},
};
}
fn addressUnixToPosix(a: *const net.UnixAddress, storage: *UnixAddress) posix.socklen_t {
@memcpy(storage.un.path[0..a.path.len], a.path);
storage.un.family = posix.AF.UNIX;
storage.un.path[a.path.len] = 0;
return @sizeOf(posix.sockaddr.un);
}
fn address4FromPosix(in: *const posix.sockaddr.in) net.Ip4Address {
return .{
.port = std.mem.bigToNative(u16, in.port),
.bytes = @bitCast(in.addr),
};
}
fn address6FromPosix(in6: *const posix.sockaddr.in6) net.Ip6Address {
return .{
.port = std.mem.bigToNative(u16, in6.port),
.bytes = in6.addr,
.flow = in6.flowinfo,
.interface = .{ .index = in6.scope_id },
};
}
fn address4ToPosix(a: net.Ip4Address) posix.sockaddr.in {
return .{
.port = std.mem.nativeToBig(u16, a.port),
.addr = @bitCast(a.bytes),
};
}
fn address6ToPosix(a: *const net.Ip6Address) posix.sockaddr.in6 {
return .{
.port = std.mem.nativeToBig(u16, a.port),
.flowinfo = a.flow,
.addr = a.bytes,
.scope_id = a.interface.index,
};
}
fn errnoBug(err: posix.E) Io.UnexpectedError {
switch (builtin.mode) {
.Debug => std.debug.panic("programmer bug caused syscall error: {t}", .{err}),
else => return error.Unexpected,
}
}
fn posixSocketMode(mode: net.Socket.Mode) u32 {
return switch (mode) {
.stream => posix.SOCK.STREAM,
.dgram => posix.SOCK.DGRAM,
.seqpacket => posix.SOCK.SEQPACKET,
.raw => posix.SOCK.RAW,
.rdm => posix.SOCK.RDM,
};
}
fn posixProtocol(protocol: ?net.Protocol) u32 {
return @intFromEnum(protocol orelse return 0);
}
fn recoverableOsBugDetected() void {
if (builtin.mode == .Debug) unreachable;
}
fn clockToPosix(clock: Io.Clock) posix.clockid_t {
return switch (clock) {
.real => posix.CLOCK.REALTIME,
.awake => switch (builtin.os.tag) {
.macos, .ios, .watchos, .tvos => posix.CLOCK.UPTIME_RAW,
else => posix.CLOCK.MONOTONIC,
},
.boot => switch (builtin.os.tag) {
.macos, .ios, .watchos, .tvos => posix.CLOCK.MONOTONIC_RAW,
else => posix.CLOCK.BOOTTIME,
},
.cpu_process => posix.CLOCK.PROCESS_CPUTIME_ID,
.cpu_thread => posix.CLOCK.THREAD_CPUTIME_ID,
};
}
fn clockToWasi(clock: Io.Clock) std.os.wasi.clockid_t {
return switch (clock) {
.realtime => .REALTIME,
.awake => .MONOTONIC,
.boot => .MONOTONIC,
.cpu_process => .PROCESS_CPUTIME_ID,
.cpu_thread => .THREAD_CPUTIME_ID,
};
}
fn statFromLinux(stx: *const std.os.linux.Statx) Io.File.Stat {
const atime = stx.atime;
const mtime = stx.mtime;
const ctime = stx.ctime;
return .{
.inode = stx.ino,
.size = stx.size,
.mode = stx.mode,
.kind = switch (stx.mode & std.os.linux.S.IFMT) {
std.os.linux.S.IFDIR => .directory,
std.os.linux.S.IFCHR => .character_device,
std.os.linux.S.IFBLK => .block_device,
std.os.linux.S.IFREG => .file,
std.os.linux.S.IFIFO => .named_pipe,
std.os.linux.S.IFLNK => .sym_link,
std.os.linux.S.IFSOCK => .unix_domain_socket,
else => .unknown,
},
.atime = .{ .nanoseconds = @intCast(@as(i128, atime.sec) * std.time.ns_per_s + atime.nsec) },
.mtime = .{ .nanoseconds = @intCast(@as(i128, mtime.sec) * std.time.ns_per_s + mtime.nsec) },
.ctime = .{ .nanoseconds = @intCast(@as(i128, ctime.sec) * std.time.ns_per_s + ctime.nsec) },
};
}
fn statFromPosix(st: *const std.posix.Stat) Io.File.Stat {
const atime = st.atime();
const mtime = st.mtime();
const ctime = st.ctime();
return .{
.inode = st.ino,
.size = @bitCast(st.size),
.mode = st.mode,
.kind = k: {
const m = st.mode & std.posix.S.IFMT;
switch (m) {
std.posix.S.IFBLK => break :k .block_device,
std.posix.S.IFCHR => break :k .character_device,
std.posix.S.IFDIR => break :k .directory,
std.posix.S.IFIFO => break :k .named_pipe,
std.posix.S.IFLNK => break :k .sym_link,
std.posix.S.IFREG => break :k .file,
std.posix.S.IFSOCK => break :k .unix_domain_socket,
else => {},
}
if (builtin.os.tag == .illumos) switch (m) {
std.posix.S.IFDOOR => break :k .door,
std.posix.S.IFPORT => break :k .event_port,
else => {},
};
break :k .unknown;
},
.atime = timestampFromPosix(&atime),
.mtime = timestampFromPosix(&mtime),
.ctime = timestampFromPosix(&ctime),
};
}
fn statFromWasi(st: *const std.os.wasi.filestat_t) Io.File.Stat {
return .{
.inode = st.ino,
.size = @bitCast(st.size),
.mode = 0,
.kind = switch (st.filetype) {
.BLOCK_DEVICE => .block_device,
.CHARACTER_DEVICE => .character_device,
.DIRECTORY => .directory,
.SYMBOLIC_LINK => .sym_link,
.REGULAR_FILE => .file,
.SOCKET_STREAM, .SOCKET_DGRAM => .unix_domain_socket,
else => .unknown,
},
.atime = st.atim,
.mtime = st.mtim,
.ctime = st.ctim,
};
}
fn timestampFromPosix(timespec: *const std.posix.timespec) Io.Timestamp {
return .{ .nanoseconds = @intCast(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec) };
}
fn timestampToPosix(nanoseconds: i96) std.posix.timespec {
return .{
.sec = @intCast(@divFloor(nanoseconds, std.time.ns_per_s)),
.nsec = @intCast(@mod(nanoseconds, std.time.ns_per_s)),
};
}
fn pathToPosix(file_path: []const u8, buffer: *[posix.PATH_MAX]u8) Io.Dir.PathNameError![:0]u8 {
if (std.mem.containsAtLeastScalar2(u8, file_path, 0, 1)) return error.BadPathName;
// >= rather than > to make room for the null byte
if (file_path.len >= buffer.len) return error.NameTooLong;
@memcpy(buffer[0..file_path.len], file_path);
buffer[file_path.len] = 0;
return buffer[0..file_path.len :0];
}
fn lookupDnsSearch(
t: *Threaded,
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
) HostName.LookupError!void {
const t_io = t.io();
const rc = HostName.ResolvConf.init(t_io) catch return error.ResolvConfParseFailed;
// Count dots, suppress search when >=ndots or name ends in
// a dot, which is an explicit request for global scope.
const dots = std.mem.countScalar(u8, host_name.bytes, '.');
const search_len = if (dots >= rc.ndots or std.mem.endsWith(u8, host_name.bytes, ".")) 0 else rc.search_len;
const search = rc.search_buffer[0..search_len];
var canon_name = host_name.bytes;
// Strip final dot for canon, fail if multiple trailing dots.
if (std.mem.endsWith(u8, canon_name, ".")) canon_name.len -= 1;
if (std.mem.endsWith(u8, canon_name, ".")) return error.UnknownHostName;
// Name with search domain appended is set up in `canon_name`. This
// both provides the desired default canonical name (if the requested
// name is not a CNAME record) and serves as a buffer for passing the
// full requested name to `lookupDns`.
@memcpy(options.canonical_name_buffer[0..canon_name.len], canon_name);
options.canonical_name_buffer[canon_name.len] = '.';
var it = std.mem.tokenizeAny(u8, search, " \t");
while (it.next()) |token| {
@memcpy(options.canonical_name_buffer[canon_name.len + 1 ..][0..token.len], token);
const lookup_canon_name = options.canonical_name_buffer[0 .. canon_name.len + 1 + token.len];
if (lookupDns(t, lookup_canon_name, &rc, resolved, options)) |result| {
return result;
} else |err| switch (err) {
error.UnknownHostName => continue,
else => |e| return e,
}
}
const lookup_canon_name = options.canonical_name_buffer[0..canon_name.len];
return lookupDns(t, lookup_canon_name, &rc, resolved, options);
}
fn lookupDns(
t: *Threaded,
lookup_canon_name: []const u8,
rc: *const HostName.ResolvConf,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
) HostName.LookupError!void {
const t_io = t.io();
const family_records: [2]struct { af: IpAddress.Family, rr: HostName.DnsRecord } = .{
.{ .af = .ip6, .rr = .A },
.{ .af = .ip4, .rr = .AAAA },
};
var query_buffers: [2][280]u8 = undefined;
var answer_buffer: [2 * 512]u8 = undefined;
var queries_buffer: [2][]const u8 = undefined;
var answers_buffer: [2][]const u8 = undefined;
var nq: usize = 0;
var answer_buffer_i: usize = 0;
for (family_records) |fr| {
if (options.family != fr.af) {
const entropy = std.crypto.random.array(u8, 2);
const len = writeResolutionQuery(&query_buffers[nq], 0, lookup_canon_name, 1, fr.rr, entropy);
queries_buffer[nq] = query_buffers[nq][0..len];
nq += 1;
}
}
var ip4_mapped_buffer: [HostName.ResolvConf.max_nameservers]IpAddress = undefined;
const ip4_mapped = ip4_mapped_buffer[0..rc.nameservers_len];
var any_ip6 = false;
for (rc.nameservers(), ip4_mapped) |*ns, *m| {
m.* = .{ .ip6 = .fromAny(ns.*) };
any_ip6 = any_ip6 or ns.* == .ip6;
}
var socket = s: {
if (any_ip6) ip6: {
const ip6_addr: IpAddress = .{ .ip6 = .unspecified(0) };
const socket = ip6_addr.bind(t_io, .{ .ip6_only = true, .mode = .dgram }) catch |err| switch (err) {
error.AddressFamilyUnsupported => break :ip6,
else => |e| return e,
};
break :s socket;
}
any_ip6 = false;
const ip4_addr: IpAddress = .{ .ip4 = .unspecified(0) };
const socket = try ip4_addr.bind(t_io, .{ .mode = .dgram });
break :s socket;
};
defer socket.close(t_io);
const mapped_nameservers = if (any_ip6) ip4_mapped else rc.nameservers();
const queries = queries_buffer[0..nq];
const answers = answers_buffer[0..queries.len];
var answers_remaining = answers.len;
for (answers) |*answer| answer.len = 0;
// boot clock is chosen because time the computer is suspended should count
// against time spent waiting for external messages to arrive.
const clock: Io.Clock = .boot;
var now_ts = try clock.now(t_io);
const final_ts = now_ts.addDuration(.fromSeconds(rc.timeout_seconds));
const attempt_duration: Io.Duration = .{
.nanoseconds = std.time.ns_per_s * @as(usize, rc.timeout_seconds) / rc.attempts,
};
send: while (now_ts.nanoseconds < final_ts.nanoseconds) : (now_ts = try clock.now(t_io)) {
const max_messages = queries_buffer.len * HostName.ResolvConf.max_nameservers;
{
var message_buffer: [max_messages]Io.net.OutgoingMessage = undefined;
var message_i: usize = 0;
for (queries, answers) |query, *answer| {
if (answer.len != 0) continue;
for (mapped_nameservers) |*ns| {
message_buffer[message_i] = .{
.address = ns,
.data_ptr = query.ptr,
.data_len = query.len,
};
message_i += 1;
}
}
_ = netSend(t, socket.handle, message_buffer[0..message_i], .{});
}
const timeout: Io.Timeout = .{ .deadline = .{
.raw = now_ts.addDuration(attempt_duration),
.clock = clock,
} };
while (true) {
var message_buffer: [max_messages]Io.net.IncomingMessage = undefined;
const buf = answer_buffer[answer_buffer_i..];
const recv_err, const recv_n = socket.receiveManyTimeout(t_io, &message_buffer, buf, .{}, timeout);
for (message_buffer[0..recv_n]) |*received_message| {
const reply = received_message.data;
// Ignore non-identifiable packets.
if (reply.len < 4) continue;
// Ignore replies from addresses we didn't send to.
const ns = for (mapped_nameservers) |*ns| {
if (received_message.from.eql(ns)) break ns;
} else {
continue;
};
// Find which query this answer goes with, if any.
const query, const answer = for (queries, answers) |query, *answer| {
if (reply[0] == query[0] and reply[1] == query[1]) break .{ query, answer };
} else {
continue;
};
if (answer.len != 0) continue;
// Only accept positive or negative responses; retry immediately on
// server failure, and ignore all other codes such as refusal.
switch (reply[3] & 15) {
0, 3 => {
answer.* = reply;
answer_buffer_i += reply.len;
answers_remaining -= 1;
if (answer_buffer.len - answer_buffer_i == 0) break :send;
if (answers_remaining == 0) break :send;
},
2 => {
var retry_message: Io.net.OutgoingMessage = .{
.address = ns,
.data_ptr = query.ptr,
.data_len = query.len,
};
_ = netSend(t, socket.handle, (&retry_message)[0..1], .{});
continue;
},
else => continue,
}
}
if (recv_err) |err| switch (err) {
error.Canceled => return error.Canceled,
error.Timeout => continue :send,
else => continue,
};
}
} else {
return error.NameServerFailure;
}
var addresses_len: usize = 0;
var canonical_name: ?HostName = null;
for (answers) |answer| {
var it = HostName.DnsResponse.init(answer) catch {
// Here we could potentially add diagnostics to the results queue.
continue;
};
while (it.next() catch {
// Here we could potentially add diagnostics to the results queue.
continue;
}) |record| switch (record.rr) {
.A => {
const data = record.packet[record.data_off..][0..record.data_len];
if (data.len != 4) return error.InvalidDnsARecord;
try resolved.putOne(t_io, .{ .address = .{ .ip4 = .{
.bytes = data[0..4].*,
.port = options.port,
} } });
addresses_len += 1;
},
.AAAA => {
const data = record.packet[record.data_off..][0..record.data_len];
if (data.len != 16) return error.InvalidDnsAAAARecord;
try resolved.putOne(t_io, .{ .address = .{ .ip6 = .{
.bytes = data[0..16].*,
.port = options.port,
} } });
addresses_len += 1;
},
.CNAME => {
_, canonical_name = HostName.expand(record.packet, record.data_off, options.canonical_name_buffer) catch
return error.InvalidDnsCnameRecord;
},
_ => continue,
};
}
try resolved.putOne(t_io, .{ .canonical_name = canonical_name orelse .{ .bytes = lookup_canon_name } });
if (addresses_len == 0) return error.NameServerFailure;
}
fn lookupHosts(
t: *Threaded,
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
) !void {
const t_io = t.io();
const file = Io.File.openAbsolute(t_io, "/etc/hosts", .{}) catch |err| switch (err) {
error.FileNotFound,
error.NotDir,
error.AccessDenied,
=> return error.UnknownHostName,
error.Canceled => |e| return e,
else => {
// Here we could add more detailed diagnostics to the results queue.
return error.DetectingNetworkConfigurationFailed;
},
};
defer file.close(t_io);
var line_buf: [512]u8 = undefined;
var file_reader = file.reader(t_io, &line_buf);
return lookupHostsReader(t, host_name, resolved, options, &file_reader.interface) catch |err| switch (err) {
error.ReadFailed => switch (file_reader.err.?) {
error.Canceled => |e| return e,
else => {
// Here we could add more detailed diagnostics to the results queue.
return error.DetectingNetworkConfigurationFailed;
},
},
error.Canceled => |e| return e,
error.UnknownHostName => |e| return e,
};
}
fn lookupHostsReader(
t: *Threaded,
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
reader: *Io.Reader,
) error{ ReadFailed, Canceled, UnknownHostName }!void {
const t_io = t.io();
var addresses_len: usize = 0;
var canonical_name: ?HostName = null;
while (true) {
const line = reader.takeDelimiterExclusive('\n') catch |err| switch (err) {
error.StreamTooLong => {
// Skip lines that are too long.
_ = reader.discardDelimiterInclusive('\n') catch |e| switch (e) {
error.EndOfStream => break,
error.ReadFailed => return error.ReadFailed,
};
continue;
},
error.ReadFailed => return error.ReadFailed,
error.EndOfStream => break,
};
reader.toss(1);
var split_it = std.mem.splitScalar(u8, line, '#');
const no_comment_line = split_it.first();
var line_it = std.mem.tokenizeAny(u8, no_comment_line, " \t");
const ip_text = line_it.next() orelse continue;
var first_name_text: ?[]const u8 = null;
while (line_it.next()) |name_text| {
if (std.mem.eql(u8, name_text, host_name.bytes)) {
if (first_name_text == null) first_name_text = name_text;
break;
}
} else continue;
if (canonical_name == null) {
if (HostName.init(first_name_text.?)) |name_text| {
if (name_text.bytes.len <= options.canonical_name_buffer.len) {
const canonical_name_dest = options.canonical_name_buffer[0..name_text.bytes.len];
@memcpy(canonical_name_dest, name_text.bytes);
canonical_name = .{ .bytes = canonical_name_dest };
}
} else |_| {}
}
if (options.family != .ip6) {
if (IpAddress.parseIp4(ip_text, options.port)) |addr| {
try resolved.putOne(t_io, .{ .address = addr });
addresses_len += 1;
} else |_| {}
}
if (options.family != .ip4) {
if (IpAddress.parseIp6(ip_text, options.port)) |addr| {
try resolved.putOne(t_io, .{ .address = addr });
addresses_len += 1;
} else |_| {}
}
}
if (canonical_name) |canon_name| try resolved.putOne(t_io, .{ .canonical_name = canon_name });
if (addresses_len == 0) return error.UnknownHostName;
}
/// Writes DNS resolution query packet data to `w`; at most 280 bytes.
fn writeResolutionQuery(q: *[280]u8, op: u4, dname: []const u8, class: u8, ty: HostName.DnsRecord, entropy: [2]u8) usize {
// This implementation is ported from musl libc.
// A more idiomatic "ziggy" implementation would be welcome.
var name = dname;
if (std.mem.endsWith(u8, name, ".")) name.len -= 1;
assert(name.len <= 253);
const n = 17 + name.len + @intFromBool(name.len != 0);
// Construct query template - ID will be filled later
q[0..2].* = entropy;
@memset(q[2..n], 0);
q[2] = @as(u8, op) * 8 + 1;
q[5] = 1;
@memcpy(q[13..][0..name.len], name);
var i: usize = 13;
var j: usize = undefined;
while (q[i] != 0) : (i = j + 1) {
j = i;
while (q[j] != 0 and q[j] != '.') : (j += 1) {}
// TODO determine the circumstances for this and whether or
// not this should be an error.
if (j - i - 1 > 62) unreachable;
q[i - 1] = @intCast(j - i);
}
q[i + 1] = @intFromEnum(ty);
q[i + 3] = class;
return n;
}
fn copyCanon(canonical_name_buffer: *[HostName.max_len]u8, name: []const u8) HostName {
const dest = canonical_name_buffer[0..name.len];
@memcpy(dest, name);
return .{ .bytes = dest };
}
/// Darwin XNU 7195.50.7.100.1 introduced __ulock_wait2 and migrated code paths (notably pthread_cond_t) towards it:
/// https://github.com/apple/darwin-xnu/commit/d4061fb0260b3ed486147341b72468f836ed6c8f#diff-08f993cc40af475663274687b7c326cc6c3031e0db3ac8de7b24624610616be6
///
/// This XNU version appears to correspond to 11.0.1:
/// https://kernelshaman.blogspot.com/2021/01/building-xnu-for-macos-big-sur-1101.html
///
/// ulock_wait() uses 32-bit micro-second timeouts where 0 = INFINITE or no-timeout
/// ulock_wait2() uses 64-bit nano-second timeouts (with the same convention)
const darwin_supports_ulock_wait2 = builtin.os.version_range.semver.min.major >= 11;
fn futexWait(t: *Threaded, ptr: *const std.atomic.Value(u32), expect: u32) Io.Cancelable!void {
@branchHint(.cold);
if (native_os == .linux) {
const linux = std.os.linux;
try t.checkCancel();
const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, null);
if (builtin.mode == .Debug) switch (linux.E.init(rc)) {
.SUCCESS => {}, // notified by `wake()`
.INTR => {}, // gives caller a chance to check cancellation
.AGAIN => {}, // ptr.* != expect
.INVAL => {}, // possibly timeout overflow
.TIMEDOUT => unreachable,
.FAULT => unreachable, // ptr was invalid
else => unreachable,
};
return;
}
if (native_os.isDarwin()) {
const c = std.c;
const flags: c.UL = .{
.op = .COMPARE_AND_WAIT,
.NO_ERRNO = true,
};
try t.checkCancel();
const status = if (darwin_supports_ulock_wait2)
c.__ulock_wait2(flags, ptr, expect, 0, 0)
else
c.__ulock_wait(flags, ptr, expect, 0);
if (status >= 0) return;
if (builtin.mode == .Debug) switch (@as(c.E, @enumFromInt(-status))) {
// Wait was interrupted by the OS or other spurious signalling.
.INTR => {},
// Address of the futex was paged out. This is unlikely, but possible in theory, and
// pthread/libdispatch on darwin bother to handle it. In this case we'll return
// without waiting, but the caller should retry anyway.
.FAULT => {},
.TIMEDOUT => unreachable,
else => unreachable,
};
return;
}
@compileError("TODO");
}
pub fn futexWaitUncancelable(ptr: *const std.atomic.Value(u32), expect: u32) void {
@branchHint(.cold);
if (native_os == .linux) {
const linux = std.os.linux;
const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, null);
if (builtin.mode == .Debug) switch (linux.E.init(rc)) {
.SUCCESS => {}, // notified by `wake()`
.INTR => {}, // gives caller a chance to check cancellation
.AGAIN => {}, // ptr.* != expect
.INVAL => {}, // possibly timeout overflow
.TIMEDOUT => unreachable,
.FAULT => unreachable, // ptr was invalid
else => unreachable,
};
return;
}
if (native_os.isDarwin()) {
const c = std.c;
const flags: c.UL = .{
.op = .COMPARE_AND_WAIT,
.NO_ERRNO = true,
};
const status = if (darwin_supports_ulock_wait2)
c.__ulock_wait2(flags, ptr, expect, 0, 0)
else
c.__ulock_wait(flags, ptr, expect, 0);
if (status >= 0) return;
if (builtin.mode == .Debug) switch (@as(c.E, @enumFromInt(-status))) {
// Wait was interrupted by the OS or other spurious signalling.
.INTR => {},
// Address of the futex was paged out. This is unlikely, but possible in theory, and
// pthread/libdispatch on darwin bother to handle it. In this case we'll return
// without waiting, but the caller should retry anyway.
.FAULT => {},
.TIMEDOUT => unreachable,
else => unreachable,
};
return;
}
@compileError("TODO");
}
pub fn futexWaitDurationUncancelable(ptr: *const std.atomic.Value(u32), expect: u32, timeout: Io.Duration) void {
@branchHint(.cold);
if (native_os == .linux) {
const linux = std.os.linux;
var ts = timestampToPosix(timeout.toNanoseconds());
const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, &ts);
if (builtin.mode == .Debug) switch (linux.E.init(rc)) {
.SUCCESS => {}, // notified by `wake()`
.INTR => {}, // gives caller a chance to check cancellation
.AGAIN => {}, // ptr.* != expect
.TIMEDOUT => {},
.INVAL => {}, // possibly timeout overflow
.FAULT => unreachable, // ptr was invalid
else => unreachable,
};
return;
}
@compileError("TODO");
}
pub fn futexWake(ptr: *const std.atomic.Value(u32), max_waiters: u32) void {
@branchHint(.cold);
if (native_os == .linux) {
const linux = std.os.linux;
const rc = linux.futex_3arg(
&ptr.raw,
.{ .cmd = .WAKE, .private = true },
@min(max_waiters, std.math.maxInt(i32)),
);
if (builtin.mode == .Debug) switch (linux.E.init(rc)) {
.SUCCESS => {}, // successful wake up
.INVAL => {}, // invalid futex_wait() on ptr done elsewhere
.FAULT => {}, // pointer became invalid while doing the wake
else => unreachable,
};
return;
}
if (native_os.isDarwin()) {
const c = std.c;
const flags: c.UL = .{
.op = .COMPARE_AND_WAIT,
.NO_ERRNO = true,
.WAKE_ALL = max_waiters > 1,
};
const is_debug = builtin.mode == .Debug;
while (true) {
const status = c.__ulock_wake(flags, ptr, 0);
if (status >= 0) return;
switch (@as(c.E, @enumFromInt(-status))) {
.INTR => continue, // spurious wake()
.FAULT => assert(!is_debug), // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t
.NOENT => return, // nothing was woken up
.ALREADY => assert(!is_debug), // only for UL.Op.WAKE_THREAD
else => assert(!is_debug),
}
}
}
@compileError("TODO");
}
/// A thread-safe logical boolean value which can be `set` and `unset`.
///
/// It can also block threads until the value is set with cancelation via timed
/// waits. Statically initializable; four bytes on all targets.
pub const ResetEvent = enum(u32) {
unset = 0,
waiting = 1,
is_set = 2,
/// Returns whether the logical boolean is `set`.
///
/// Once `reset` is called, this returns false until the next `set`.
///
/// The memory accesses before the `set` can be said to happen before
/// `isSet` returns true.
pub fn isSet(re: *const ResetEvent) bool {
if (builtin.single_threaded) return switch (re.*) {
.unset => false,
.waiting => unreachable,
.is_set => true,
};
// Acquire barrier ensures memory accesses before `set` happen before
// returning true.
return @atomicLoad(ResetEvent, re, .acquire) == .is_set;
}
/// Blocks the calling thread until `set` is called.
///
/// This is effectively a more efficient version of `while (!isSet()) {}`.
///
/// The memory accesses before the `set` can be said to happen before `wait` returns.
pub fn wait(re: *ResetEvent, t: *Threaded) Io.Cancelable!void {
if (builtin.single_threaded) switch (re.*) {
.unset => unreachable, // Deadlock, no other threads to wake us up.
.waiting => unreachable, // Invalid state.
.is_set => return,
};
if (re.isSet()) {
@branchHint(.likely);
return;
}
// Try to set the state from `unset` to `waiting` to indicate to the
// `set` thread that others are blocked on the ResetEvent. Avoid using
// any strict barriers until we know the ResetEvent is set.
var state = @atomicLoad(ResetEvent, re, .acquire);
if (state == .unset) {
state = @cmpxchgStrong(ResetEvent, re, state, .waiting, .acquire, .acquire) orelse .waiting;
}
while (state == .waiting) {
try futexWait(t, @ptrCast(re), @intFromEnum(ResetEvent.waiting));
state = @atomicLoad(ResetEvent, re, .acquire);
}
assert(state == .is_set);
}
/// Same as `wait` except uninterruptible.
pub fn waitUncancelable(re: *ResetEvent) void {
if (builtin.single_threaded) switch (re.*) {
.unset => unreachable, // Deadlock, no other threads to wake us up.
.waiting => unreachable, // Invalid state.
.is_set => return,
};
if (re.isSet()) {
@branchHint(.likely);
return;
}
// Try to set the state from `unset` to `waiting` to indicate to the
// `set` thread that others are blocked on the ResetEvent. Avoid using
// any strict barriers until we know the ResetEvent is set.
var state = @atomicLoad(ResetEvent, re, .acquire);
if (state == .unset) {
state = @cmpxchgStrong(ResetEvent, re, state, .waiting, .acquire, .acquire) orelse .waiting;
}
while (state == .waiting) {
futexWaitUncancelable(@ptrCast(re), @intFromEnum(ResetEvent.waiting));
state = @atomicLoad(ResetEvent, re, .acquire);
}
assert(state == .is_set);
}
/// Marks the logical boolean as `set` and unblocks any threads in `wait`
/// or `timedWait` to observe the new state.
///
/// The logical boolean stays `set` until `reset` is called, making future
/// `set` calls do nothing semantically.
///
/// The memory accesses before `set` can be said to happen before `isSet`
/// returns true or `wait`/`timedWait` return successfully.
pub fn set(re: *ResetEvent) void {
if (builtin.single_threaded) {
re.* = .is_set;
return;
}
if (@atomicRmw(ResetEvent, re, .Xchg, .is_set, .release) == .waiting) {
futexWake(@ptrCast(re), std.math.maxInt(u32));
}
}
/// Unmarks the ResetEvent as if `set` was never called.
///
/// Assumes no threads are blocked in `wait` or `timedWait`. Concurrent
/// calls to `set`, `isSet` and `reset` are allowed.
pub fn reset(re: *ResetEvent) void {
if (builtin.single_threaded) {
re.* = .unset;
return;
}
@atomicStore(ResetEvent, re, .unset, .monotonic);
}
};