zig/lib/std/Io/EventLoop.zig
Lukas Lalinsky 2b5306a94b Add missing clobbers to context switching
This only shows in release mode, the compiler tries to preserve some
value in rdi, but that gets replaced inside the fiber. This would not
happen in the C calling convention, but in these normal Zig functions,
it can happen.
2025-10-09 00:45:49 -07:00

1618 lines
56 KiB
Zig

const std = @import("../std.zig");
const builtin = @import("builtin");
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;
const Io = std.Io;
const EventLoop = @This();
const Alignment = std.mem.Alignment;
const IoUring = std.os.linux.IoUring;
/// Must be a thread-safe allocator.
gpa: Allocator,
main_fiber_buffer: [@sizeOf(Fiber) + Fiber.max_result_size]u8 align(@alignOf(Fiber)),
threads: Thread.List,
detached: struct {
mutex: std.Io.Mutex,
list: std.DoublyLinkedList,
},
/// Empirically saw >128KB being used by the self-hosted backend to panic.
const idle_stack_size = 256 * 1024;
const max_idle_search = 4;
const max_steal_ready_search = 4;
const io_uring_entries = 64;
const Thread = struct {
thread: std.Thread,
idle_context: Context,
current_context: *Context,
ready_queue: ?*Fiber,
io_uring: IoUring,
idle_search_index: u32,
steal_ready_search_index: u32,
const canceling: ?*Thread = @ptrFromInt(@alignOf(Thread));
threadlocal var self: *Thread = undefined;
fn current() *Thread {
return self;
}
fn currentFiber(thread: *Thread) *Fiber {
return @fieldParentPtr("context", thread.current_context);
}
const List = struct {
allocated: []Thread,
reserved: u32,
active: u32,
};
};
const Fiber = struct {
required_align: void align(4),
context: Context,
awaiter: ?*Fiber,
queue_next: ?*Fiber,
cancel_thread: ?*Thread,
awaiting_completions: std.StaticBitSet(3),
const finished: ?*Fiber = @ptrFromInt(@alignOf(Thread));
const max_result_align: Alignment = .@"16";
const max_result_size = max_result_align.forward(64);
/// This includes any stack realignments that need to happen, and also the
/// initial frame return address slot and argument frame, depending on target.
const min_stack_size = 4 * 1024 * 1024;
const max_context_align: Alignment = .@"16";
const max_context_size = max_context_align.forward(1024);
const max_closure_size: usize = @max(@sizeOf(AsyncClosure), @sizeOf(DetachedClosure));
const max_closure_align: Alignment = .max(.of(AsyncClosure), .of(DetachedClosure));
const allocation_size = std.mem.alignForward(
usize,
max_closure_align.max(max_context_align).forward(
max_result_align.forward(@sizeOf(Fiber)) + max_result_size + min_stack_size,
) + max_closure_size + max_context_size,
std.heap.page_size_max,
);
fn allocate(el: *EventLoop) error{OutOfMemory}!*Fiber {
return @ptrCast(try el.gpa.alignedAlloc(u8, .of(Fiber), allocation_size));
}
fn allocatedSlice(f: *Fiber) []align(@alignOf(Fiber)) u8 {
return @as([*]align(@alignOf(Fiber)) u8, @ptrCast(f))[0..allocation_size];
}
fn allocatedEnd(f: *Fiber) [*]u8 {
const allocated_slice = f.allocatedSlice();
return allocated_slice[allocated_slice.len..].ptr;
}
fn resultPointer(f: *Fiber, comptime Result: type) *Result {
return @ptrCast(@alignCast(f.resultBytes(.of(Result))));
}
fn resultBytes(f: *Fiber, alignment: Alignment) [*]u8 {
return @ptrFromInt(alignment.forward(@intFromPtr(f) + @sizeOf(Fiber)));
}
fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{Canceled}!void {
if (@cmpxchgStrong(
?*Thread,
&fiber.cancel_thread,
null,
thread,
.acq_rel,
.acquire,
)) |cancel_thread| {
assert(cancel_thread == Thread.canceling);
return error.Canceled;
}
}
fn exitCancelRegion(fiber: *Fiber, thread: *Thread) void {
if (@cmpxchgStrong(
?*Thread,
&fiber.cancel_thread,
thread,
null,
.acq_rel,
.acquire,
)) |cancel_thread| assert(cancel_thread == Thread.canceling);
}
const Queue = struct { head: *Fiber, tail: *Fiber };
};
fn recycle(el: *EventLoop, fiber: *Fiber) void {
std.log.debug("recyling {*}", .{fiber});
assert(fiber.queue_next == null);
el.gpa.free(fiber.allocatedSlice());
}
pub fn io(el: *EventLoop) Io {
return .{
.userdata = el,
.vtable = &.{
.async = async,
.concurrent = concurrent,
.await = await,
.asyncDetached = asyncDetached,
.select = select,
.cancel = cancel,
.cancelRequested = cancelRequested,
.mutexLock = mutexLock,
.mutexUnlock = mutexUnlock,
.conditionWait = conditionWait,
.conditionWake = conditionWake,
.createFile = createFile,
.fileOpen = fileOpen,
.fileClose = fileClose,
.pread = pread,
.pwrite = pwrite,
.now = now,
.sleep = sleep,
},
};
}
pub fn init(el: *EventLoop, gpa: Allocator) !void {
const threads_size = @max(std.Thread.getCpuCount() catch 1, 1) * @sizeOf(Thread);
const idle_stack_end_offset = std.mem.alignForward(usize, threads_size + idle_stack_size, std.heap.page_size_max);
const allocated_slice = try gpa.alignedAlloc(u8, .of(Thread), idle_stack_end_offset);
errdefer gpa.free(allocated_slice);
el.* = .{
.gpa = gpa,
.main_fiber_buffer = undefined,
.threads = .{
.allocated = @ptrCast(allocated_slice[0..threads_size]),
.reserved = 1,
.active = 1,
},
.detached = .{
.mutex = .init,
.list = .{},
},
};
const main_fiber: *Fiber = @ptrCast(&el.main_fiber_buffer);
main_fiber.* = .{
.required_align = {},
.context = undefined,
.awaiter = null,
.queue_next = null,
.cancel_thread = null,
.awaiting_completions = .initEmpty(),
};
const main_thread = &el.threads.allocated[0];
Thread.self = main_thread;
const idle_stack_end: [*]align(16) usize = @ptrCast(@alignCast(allocated_slice[idle_stack_end_offset..].ptr));
(idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)};
main_thread.* = .{
.thread = undefined,
.idle_context = switch (builtin.cpu.arch) {
.aarch64 => .{
.sp = @intFromPtr(idle_stack_end),
.fp = 0,
.pc = @intFromPtr(&mainIdleEntry),
},
.x86_64 => .{
.rsp = @intFromPtr(idle_stack_end - 1),
.rbp = 0,
.rip = @intFromPtr(&mainIdleEntry),
},
else => @compileError("unimplemented architecture"),
},
.current_context = &main_fiber.context,
.ready_queue = null,
.io_uring = try IoUring.init(io_uring_entries, 0),
.idle_search_index = 1,
.steal_ready_search_index = 1,
};
errdefer main_thread.io_uring.deinit();
std.log.debug("created main idle {*}", .{&main_thread.idle_context});
std.log.debug("created main {*}", .{main_fiber});
}
pub fn deinit(el: *EventLoop) void {
while (true) cancel(el, detached_future: {
el.detached.mutex.lock(el.io()) catch |err| switch (err) {
error.Canceled => unreachable, // main fiber cannot be canceled
};
defer el.detached.mutex.unlock(el.io());
const detached: *DetachedClosure = @fieldParentPtr(
"detached_queue_node",
el.detached.list.pop() orelse break,
);
// notify the detached fiber that it is no longer allowed to recycle itself
detached.detached_queue_node = .{
.prev = &detached.detached_queue_node,
.next = &detached.detached_queue_node,
};
break :detached_future @ptrCast(detached.fiber);
}, &.{}, .@"1");
const active_threads = @atomicLoad(u32, &el.threads.active, .acquire);
for (el.threads.allocated[0..active_threads]) |*thread| {
const ready_fiber = @atomicLoad(?*Fiber, &thread.ready_queue, .monotonic);
assert(ready_fiber == null or ready_fiber == Fiber.finished); // pending async
}
el.yield(null, .exit);
const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @ptrCast(@alignCast(el.threads.allocated.ptr));
const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max);
for (el.threads.allocated[1..active_threads]) |*thread| thread.thread.join();
el.gpa.free(allocated_ptr[0..idle_stack_end_offset]);
el.* = undefined;
}
fn findReadyFiber(el: *EventLoop, thread: *Thread) ?*Fiber {
if (@atomicRmw(?*Fiber, &thread.ready_queue, .Xchg, Fiber.finished, .acquire)) |ready_fiber| {
@atomicStore(?*Fiber, &thread.ready_queue, ready_fiber.queue_next, .release);
ready_fiber.queue_next = null;
return ready_fiber;
}
const active_threads = @atomicLoad(u32, &el.threads.active, .acquire);
for (0..@min(max_steal_ready_search, active_threads)) |_| {
defer thread.steal_ready_search_index += 1;
if (thread.steal_ready_search_index == active_threads) thread.steal_ready_search_index = 0;
const steal_ready_search_thread = &el.threads.allocated[0..active_threads][thread.steal_ready_search_index];
if (steal_ready_search_thread == thread) continue;
const ready_fiber = @atomicLoad(?*Fiber, &steal_ready_search_thread.ready_queue, .acquire) orelse continue;
if (ready_fiber == Fiber.finished) continue;
if (@cmpxchgWeak(
?*Fiber,
&steal_ready_search_thread.ready_queue,
ready_fiber,
null,
.acquire,
.monotonic,
)) |_| continue;
@atomicStore(?*Fiber, &thread.ready_queue, ready_fiber.queue_next, .release);
ready_fiber.queue_next = null;
return ready_fiber;
}
// couldn't find anything to do, so we are now open for business
@atomicStore(?*Fiber, &thread.ready_queue, null, .monotonic);
return null;
}
fn yield(el: *EventLoop, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void {
const thread: *Thread = .current();
const ready_context = if (maybe_ready_fiber orelse el.findReadyFiber(thread)) |ready_fiber|
&ready_fiber.context
else
&thread.idle_context;
const message: SwitchMessage = .{
.contexts = .{
.prev = thread.current_context,
.ready = ready_context,
},
.pending_task = pending_task,
};
std.log.debug("switching from {*} to {*}", .{ message.contexts.prev, message.contexts.ready });
contextSwitch(&message).handle(el);
}
fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void {
{
var fiber = ready_queue.head;
while (true) {
std.log.debug("scheduling {*}", .{fiber});
fiber = fiber.queue_next orelse break;
}
assert(fiber == ready_queue.tail);
}
// shared fields of previous `Thread` must be initialized before later ones are marked as active
const new_thread_index = @atomicLoad(u32, &el.threads.active, .acquire);
for (0..@min(max_idle_search, new_thread_index)) |_| {
defer thread.idle_search_index += 1;
if (thread.idle_search_index == new_thread_index) thread.idle_search_index = 0;
const idle_search_thread = &el.threads.allocated[0..new_thread_index][thread.idle_search_index];
if (idle_search_thread == thread) continue;
if (@cmpxchgWeak(
?*Fiber,
&idle_search_thread.ready_queue,
null,
ready_queue.head,
.release,
.monotonic,
)) |_| continue;
getSqe(&thread.io_uring).* = .{
.opcode = .MSG_RING,
.flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
.ioprio = 0,
.fd = idle_search_thread.io_uring.fd,
.off = @intFromEnum(Completion.UserData.wakeup),
.addr = 0,
.len = 0,
.rw_flags = 0,
.user_data = @intFromEnum(Completion.UserData.wakeup),
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
.addr3 = 0,
.resv = 0,
};
return;
}
spawn_thread: {
// previous failed reservations must have completed before retrying
if (new_thread_index == el.threads.allocated.len or @cmpxchgWeak(
u32,
&el.threads.reserved,
new_thread_index,
new_thread_index + 1,
.acquire,
.monotonic,
) != null) break :spawn_thread;
const new_thread = &el.threads.allocated[new_thread_index];
const next_thread_index = new_thread_index + 1;
new_thread.* = .{
.thread = undefined,
.idle_context = undefined,
.current_context = &new_thread.idle_context,
.ready_queue = ready_queue.head,
.io_uring = IoUring.init(io_uring_entries, 0) catch |err| {
@atomicStore(u32, &el.threads.reserved, new_thread_index, .release);
// no more access to `thread` after giving up reservation
std.log.warn("unable to create worker thread due to io_uring init failure: {s}", .{@errorName(err)});
break :spawn_thread;
},
.idle_search_index = 0,
.steal_ready_search_index = 0,
};
new_thread.thread = std.Thread.spawn(.{
.stack_size = idle_stack_size,
.allocator = el.gpa,
}, threadEntry, .{ el, new_thread_index }) catch |err| {
new_thread.io_uring.deinit();
@atomicStore(u32, &el.threads.reserved, new_thread_index, .release);
// no more access to `thread` after giving up reservation
std.log.warn("unable to create worker thread due spawn failure: {s}", .{@errorName(err)});
break :spawn_thread;
};
// shared fields of `Thread` must be initialized before being marked active
@atomicStore(u32, &el.threads.active, next_thread_index, .release);
return;
}
// nobody wanted it, so just queue it on ourselves
while (@cmpxchgWeak(
?*Fiber,
&thread.ready_queue,
ready_queue.tail.queue_next,
ready_queue.head,
.acq_rel,
.acquire,
)) |old_head| ready_queue.tail.queue_next = old_head;
}
fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn {
message.handle(el);
el.idle(&el.threads.allocated[0]);
el.yield(@ptrCast(&el.main_fiber_buffer), .nothing);
unreachable; // switched to dead fiber
}
fn threadEntry(el: *EventLoop, index: u32) void {
const thread: *Thread = &el.threads.allocated[index];
Thread.self = thread;
std.log.debug("created thread idle {*}", .{&thread.idle_context});
el.idle(thread);
}
const Completion = struct {
const UserData = enum(usize) {
unused,
wakeup,
cleanup,
exit,
/// *Fiber
_,
};
result: i32,
flags: u32,
};
fn idle(el: *EventLoop, thread: *Thread) void {
var maybe_ready_fiber: ?*Fiber = null;
while (true) {
while (maybe_ready_fiber orelse el.findReadyFiber(thread)) |ready_fiber| {
el.yield(ready_fiber, .nothing);
maybe_ready_fiber = null;
}
_ = thread.io_uring.submit_and_wait(1) catch |err| switch (err) {
error.SignalInterrupt => std.log.warn("submit_and_wait failed with SignalInterrupt", .{}),
else => |e| @panic(@errorName(e)),
};
var cqes_buffer: [io_uring_entries]std.os.linux.io_uring_cqe = undefined;
var maybe_ready_queue: ?Fiber.Queue = null;
for (cqes_buffer[0 .. thread.io_uring.copy_cqes(&cqes_buffer, 0) catch |err| switch (err) {
error.SignalInterrupt => cqes_len: {
std.log.warn("copy_cqes failed with SignalInterrupt", .{});
break :cqes_len 0;
},
else => |e| @panic(@errorName(e)),
}]) |cqe| switch (@as(Completion.UserData, @enumFromInt(cqe.user_data))) {
.unused => unreachable, // bad submission queued?
.wakeup => {},
.cleanup => @panic("failed to notify other threads that we are exiting"),
.exit => {
assert(maybe_ready_fiber == null and maybe_ready_queue == null); // pending async
return;
},
_ => switch (errno(cqe.res)) {
.INTR => getSqe(&thread.io_uring).* = .{
.opcode = .ASYNC_CANCEL,
.flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
.ioprio = 0,
.fd = 0,
.off = 0,
.addr = cqe.user_data,
.len = 0,
.rw_flags = 0,
.user_data = @intFromEnum(Completion.UserData.wakeup),
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
.addr3 = 0,
.resv = 0,
},
else => {
const fiber: *Fiber = @ptrFromInt(cqe.user_data);
assert(fiber.queue_next == null);
fiber.resultPointer(Completion).* = .{
.result = cqe.res,
.flags = cqe.flags,
};
if (maybe_ready_fiber == null) maybe_ready_fiber = fiber else if (maybe_ready_queue) |*ready_queue| {
ready_queue.tail.queue_next = fiber;
ready_queue.tail = fiber;
} else maybe_ready_queue = .{ .head = fiber, .tail = fiber };
},
},
};
if (maybe_ready_queue) |ready_queue| el.schedule(thread, ready_queue);
}
}
const SwitchMessage = struct {
contexts: extern struct {
prev: *Context,
ready: *Context,
},
pending_task: PendingTask,
const PendingTask = union(enum) {
nothing,
reschedule,
recycle,
register_awaiter: *?*Fiber,
register_select: []const *Io.AnyFuture,
mutex_lock: struct {
prev_state: Io.Mutex.State,
mutex: *Io.Mutex,
},
condition_wait: struct {
cond: *Io.Condition,
mutex: *Io.Mutex,
},
exit,
};
fn handle(message: *const SwitchMessage, el: *EventLoop) void {
const thread: *Thread = .current();
thread.current_context = message.contexts.ready;
switch (message.pending_task) {
.nothing => {},
.reschedule => if (message.contexts.prev != &thread.idle_context) {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
assert(prev_fiber.queue_next == null);
el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
},
.recycle => {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
assert(prev_fiber.queue_next == null);
el.recycle(prev_fiber);
},
.register_awaiter => |awaiter| {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
assert(prev_fiber.queue_next == null);
if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished)
el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
},
.register_select => |futures| {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
assert(prev_fiber.queue_next == null);
for (futures) |any_future| {
const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
if (@atomicRmw(?*Fiber, &future_fiber.awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) {
const closure: *AsyncClosure = .fromFiber(future_fiber);
if (!@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .seq_cst)) {
el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
}
}
}
},
.mutex_lock => |mutex_lock| {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
assert(prev_fiber.queue_next == null);
var prev_state = mutex_lock.prev_state;
while (switch (prev_state) {
else => next_state: {
prev_fiber.queue_next = @ptrFromInt(@intFromEnum(prev_state));
break :next_state @cmpxchgWeak(
Io.Mutex.State,
&mutex_lock.mutex.state,
prev_state,
@enumFromInt(@intFromPtr(prev_fiber)),
.release,
.acquire,
);
},
.unlocked => @cmpxchgWeak(
Io.Mutex.State,
&mutex_lock.mutex.state,
.unlocked,
.locked_once,
.acquire,
.acquire,
) orelse {
prev_fiber.queue_next = null;
el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
return;
},
}) |next_state| prev_state = next_state;
},
.condition_wait => |condition_wait| {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
assert(prev_fiber.queue_next == null);
const cond_impl = prev_fiber.resultPointer(ConditionImpl);
cond_impl.* = .{
.tail = prev_fiber,
.event = .queued,
};
if (@cmpxchgStrong(
?*Fiber,
@as(*?*Fiber, @ptrCast(&condition_wait.cond.state)),
null,
prev_fiber,
.release,
.acquire,
)) |waiting_fiber| {
const waiting_cond_impl = waiting_fiber.?.resultPointer(ConditionImpl);
assert(waiting_cond_impl.tail.queue_next == null);
waiting_cond_impl.tail.queue_next = prev_fiber;
waiting_cond_impl.tail = prev_fiber;
}
condition_wait.mutex.unlock(el.io());
},
.exit => for (el.threads.allocated[0..@atomicLoad(u32, &el.threads.active, .acquire)]) |*each_thread| {
getSqe(&thread.io_uring).* = .{
.opcode = .MSG_RING,
.flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
.ioprio = 0,
.fd = each_thread.io_uring.fd,
.off = @intFromEnum(Completion.UserData.exit),
.addr = 0,
.len = 0,
.rw_flags = 0,
.user_data = @intFromEnum(Completion.UserData.cleanup),
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
.addr3 = 0,
.resv = 0,
};
},
}
}
};
const Context = switch (builtin.cpu.arch) {
.aarch64 => extern struct {
sp: u64,
fp: u64,
pc: u64,
},
.x86_64 => extern struct {
rsp: u64,
rbp: u64,
rip: u64,
},
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
};
inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage {
return @fieldParentPtr("contexts", switch (builtin.cpu.arch) {
.aarch64 => asm volatile (
\\ ldp x0, x2, [x1]
\\ ldr x3, [x2, #16]
\\ mov x4, sp
\\ stp x4, fp, [x0]
\\ adr x5, 0f
\\ ldp x4, fp, [x2]
\\ str x5, [x0, #16]
\\ mov sp, x4
\\ br x3
\\0:
: [received_message] "={x1}" (-> *const @FieldType(SwitchMessage, "contexts")),
: [message_to_send] "{x1}" (&message.contexts),
: .{
.x0 = true,
.x1 = true,
.x2 = true,
.x3 = true,
.x4 = true,
.x5 = true,
.x6 = true,
.x7 = true,
.x8 = true,
.x9 = true,
.x10 = true,
.x11 = true,
.x12 = true,
.x13 = true,
.x14 = true,
.x15 = true,
.x16 = true,
.x17 = true,
.x18 = true,
.x19 = true,
.x20 = true,
.x21 = true,
.x22 = true,
.x23 = true,
.x24 = true,
.x25 = true,
.x26 = true,
.x27 = true,
.x28 = true,
.x30 = true,
.z0 = true,
.z1 = true,
.z2 = true,
.z3 = true,
.z4 = true,
.z5 = true,
.z6 = true,
.z7 = true,
.z8 = true,
.z9 = true,
.z10 = true,
.z11 = true,
.z12 = true,
.z13 = true,
.z14 = true,
.z15 = true,
.z16 = true,
.z17 = true,
.z18 = true,
.z19 = true,
.z20 = true,
.z21 = true,
.z22 = true,
.z23 = true,
.z24 = true,
.z25 = true,
.z26 = true,
.z27 = true,
.z28 = true,
.z29 = true,
.z30 = true,
.z31 = true,
.p0 = true,
.p1 = true,
.p2 = true,
.p3 = true,
.p4 = true,
.p5 = true,
.p6 = true,
.p7 = true,
.p8 = true,
.p9 = true,
.p10 = true,
.p11 = true,
.p12 = true,
.p13 = true,
.p14 = true,
.p15 = true,
.fpcr = true,
.fpsr = true,
.ffr = true,
.memory = true,
}),
.x86_64 => asm volatile (
\\ movq 0(%%rsi), %%rax
\\ movq 8(%%rsi), %%rcx
\\ leaq 0f(%%rip), %%rdx
\\ movq %%rsp, 0(%%rax)
\\ movq %%rbp, 8(%%rax)
\\ movq %%rdx, 16(%%rax)
\\ movq 0(%%rcx), %%rsp
\\ movq 8(%%rcx), %%rbp
\\ jmpq *16(%%rcx)
\\0:
: [received_message] "={rsi}" (-> *const @FieldType(SwitchMessage, "contexts")),
: [message_to_send] "{rsi}" (&message.contexts),
: .{
.rax = true,
.rcx = true,
.rdx = true,
.rbx = true,
.rsi = true,
.rdi = true,
.r8 = true,
.r9 = true,
.r10 = true,
.r11 = true,
.r12 = true,
.r13 = true,
.r14 = true,
.r15 = true,
.mm0 = true,
.mm1 = true,
.mm2 = true,
.mm3 = true,
.mm4 = true,
.mm5 = true,
.mm6 = true,
.mm7 = true,
.zmm0 = true,
.zmm1 = true,
.zmm2 = true,
.zmm3 = true,
.zmm4 = true,
.zmm5 = true,
.zmm6 = true,
.zmm7 = true,
.zmm8 = true,
.zmm9 = true,
.zmm10 = true,
.zmm11 = true,
.zmm12 = true,
.zmm13 = true,
.zmm14 = true,
.zmm15 = true,
.zmm16 = true,
.zmm17 = true,
.zmm18 = true,
.zmm19 = true,
.zmm20 = true,
.zmm21 = true,
.zmm22 = true,
.zmm23 = true,
.zmm24 = true,
.zmm25 = true,
.zmm26 = true,
.zmm27 = true,
.zmm28 = true,
.zmm29 = true,
.zmm30 = true,
.zmm31 = true,
.fpsr = true,
.fpcr = true,
.mxcsr = true,
.rflags = true,
.dirflag = true,
.memory = true,
}),
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
});
}
fn mainIdleEntry() callconv(.naked) void {
switch (builtin.cpu.arch) {
.x86_64 => asm volatile (
\\ movq (%%rsp), %%rdi
\\ jmp %[mainIdle:P]
:
: [mainIdle] "X" (&mainIdle),
),
.aarch64 => asm volatile (
\\ ldr x0, [sp, #-8]
\\ b %[mainIdle]
:
: [mainIdle] "X" (&mainIdle),
),
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
}
}
fn fiberEntry() callconv(.naked) void {
switch (builtin.cpu.arch) {
.x86_64 => asm volatile (
\\ leaq 8(%%rsp), %%rdi
\\ jmpq *(%%rsp)
),
.aarch64 => asm volatile (
\\ mov x0, sp
\\ ldr x2, [sp, #-8]
\\ br x2
),
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
}
}
const AsyncClosure = struct {
event_loop: *EventLoop,
fiber: *Fiber,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
result_align: Alignment,
already_awaited: bool,
fn contextPointer(closure: *AsyncClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(AsyncClosure));
}
fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn {
message.handle(closure.event_loop);
const fiber = closure.fiber;
std.log.debug("{*} performing async", .{fiber});
closure.start(closure.contextPointer(), fiber.resultBytes(closure.result_align));
const awaiter = @atomicRmw(?*Fiber, &fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
const ready_awaiter = r: {
const a = awaiter orelse break :r null;
if (@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .acq_rel)) break :r null;
break :r a;
};
closure.event_loop.yield(ready_awaiter, .nothing);
unreachable; // switched to dead fiber
}
fn fromFiber(fiber: *Fiber) *AsyncClosure {
return @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward(
@intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size,
) - @sizeOf(AsyncClosure));
}
};
fn async(
userdata: ?*anyopaque,
result: []u8,
result_alignment: Alignment,
context: []const u8,
context_alignment: Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*std.Io.AnyFuture {
return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
start(context.ptr, result.ptr);
return null;
};
}
fn concurrent(
userdata: ?*anyopaque,
result_len: usize,
result_alignment: Alignment,
context: []const u8,
context_alignment: Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) error{OutOfMemory}!*std.Io.AnyFuture {
assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO
assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
assert(result_len <= Fiber.max_result_size); // TODO
assert(context.len <= Fiber.max_context_size); // TODO
const event_loop: *EventLoop = @ptrCast(@alignCast(userdata));
const fiber = try Fiber.allocate(event_loop);
std.log.debug("allocated {*}", .{fiber});
const closure: *AsyncClosure = .fromFiber(fiber);
const stack_end: [*]align(16) usize = @ptrCast(@alignCast(closure));
(stack_end - 1)[0..1].* = .{@intFromPtr(&AsyncClosure.call)};
fiber.* = .{
.required_align = {},
.context = switch (builtin.cpu.arch) {
.x86_64 => .{
.rsp = @intFromPtr(stack_end - 1),
.rbp = 0,
.rip = @intFromPtr(&fiberEntry),
},
.aarch64 => .{
.sp = @intFromPtr(stack_end),
.fp = 0,
.pc = @intFromPtr(&fiberEntry),
},
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
},
.awaiter = null,
.queue_next = null,
.cancel_thread = null,
.awaiting_completions = .initEmpty(),
};
closure.* = .{
.event_loop = event_loop,
.fiber = fiber,
.start = start,
.result_align = result_alignment,
.already_awaited = false,
};
@memcpy(closure.contextPointer(), context);
event_loop.schedule(.current(), .{ .head = fiber, .tail = fiber });
return @ptrCast(fiber);
}
const DetachedClosure = struct {
event_loop: *EventLoop,
fiber: *Fiber,
start: *const fn (context: *const anyopaque) void,
detached_queue_node: std.DoublyLinkedList.Node,
fn contextPointer(closure: *DetachedClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(DetachedClosure));
}
fn call(closure: *DetachedClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(DetachedClosure))) noreturn {
message.handle(closure.event_loop);
std.log.debug("{*} performing async detached", .{closure.fiber});
closure.start(closure.contextPointer());
const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
closure.event_loop.yield(awaiter, pending_task: {
closure.event_loop.detached.mutex.lock(closure.event_loop.io()) catch |err| switch (err) {
error.Canceled => break :pending_task .nothing,
};
defer closure.event_loop.detached.mutex.unlock(closure.event_loop.io());
if (closure.detached_queue_node.next == &closure.detached_queue_node) break :pending_task .nothing;
closure.event_loop.detached.list.remove(&closure.detached_queue_node);
break :pending_task .recycle;
});
unreachable; // switched to dead fiber
}
};
fn asyncDetached(
userdata: ?*anyopaque,
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque) void,
) void {
assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
assert(context.len <= Fiber.max_context_size); // TODO
const event_loop: *EventLoop = @ptrCast(@alignCast(userdata));
const fiber = Fiber.allocate(event_loop) catch {
start(context.ptr);
return;
};
std.log.debug("allocated {*}", .{fiber});
const current_thread: *Thread = .current();
const closure: *DetachedClosure = @ptrFromInt(Fiber.max_context_align.max(.of(DetachedClosure)).backward(
@intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size,
) - @sizeOf(DetachedClosure));
const stack_end: [*]align(16) usize = @ptrCast(@alignCast(closure));
(stack_end - 1)[0..1].* = .{@intFromPtr(&DetachedClosure.call)};
fiber.* = .{
.required_align = {},
.context = switch (builtin.cpu.arch) {
.x86_64 => .{
.rsp = @intFromPtr(stack_end - 1),
.rbp = 0,
.rip = @intFromPtr(&fiberEntry),
},
.aarch64 => .{
.sp = @intFromPtr(stack_end),
.fp = 0,
.pc = @intFromPtr(&fiberEntry),
},
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
},
.awaiter = null,
.queue_next = null,
.cancel_thread = null,
.awaiting_completions = .initEmpty(),
};
closure.* = .{
.event_loop = event_loop,
.fiber = fiber,
.start = start,
.detached_queue_node = .{},
};
{
event_loop.detached.mutex.lock(event_loop.io()) catch |err| switch (err) {
error.Canceled => {
event_loop.recycle(fiber);
start(context.ptr);
return;
},
};
defer event_loop.detached.mutex.unlock(event_loop.io());
event_loop.detached.list.append(&closure.detached_queue_node);
}
@memcpy(closure.contextPointer(), context);
event_loop.schedule(current_thread, .{ .head = fiber, .tail = fiber });
}
fn await(
userdata: ?*anyopaque,
any_future: *std.Io.AnyFuture,
result: []u8,
result_alignment: Alignment,
) void {
const event_loop: *EventLoop = @ptrCast(@alignCast(userdata));
const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished)
event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
@memcpy(result, future_fiber.resultBytes(result_alignment));
event_loop.recycle(future_fiber);
}
fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize {
const el: *EventLoop = @ptrCast(@alignCast(userdata));
// Optimization to avoid the yield below.
for (futures, 0..) |any_future, i| {
const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) == Fiber.finished)
return i;
}
el.yield(null, .{ .register_select = futures });
std.log.debug("back from select yield", .{});
const my_thread: *Thread = .current();
const my_fiber = my_thread.currentFiber();
var result: ?usize = null;
for (futures, 0..) |any_future, i| {
const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
if (@cmpxchgStrong(?*Fiber, &future_fiber.awaiter, my_fiber, null, .seq_cst, .seq_cst)) |awaiter| {
if (awaiter == Fiber.finished) {
if (result == null) result = i;
} else if (awaiter) |a| {
const closure: *AsyncClosure = .fromFiber(a);
closure.already_awaited = false;
}
} else {
const closure: *AsyncClosure = .fromFiber(my_fiber);
closure.already_awaited = false;
}
}
return result.?;
}
fn cancel(
userdata: ?*anyopaque,
any_future: *std.Io.AnyFuture,
result: []u8,
result_alignment: Alignment,
) void {
const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
if (@atomicRmw(
?*Thread,
&future_fiber.cancel_thread,
.Xchg,
Thread.canceling,
.acq_rel,
)) |cancel_thread| if (cancel_thread != Thread.canceling) {
getSqe(&Thread.current().io_uring).* = .{
.opcode = .MSG_RING,
.flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
.ioprio = 0,
.fd = cancel_thread.io_uring.fd,
.off = @intFromPtr(future_fiber),
.addr = 0,
.len = @bitCast(-@as(i32, @intFromEnum(std.os.linux.E.INTR))),
.rw_flags = 0,
.user_data = @intFromEnum(Completion.UserData.cleanup),
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
.addr3 = 0,
.resv = 0,
};
};
await(userdata, any_future, result, result_alignment);
}
fn cancelRequested(userdata: ?*anyopaque) bool {
_ = userdata;
return @atomicLoad(?*Thread, &Thread.current().currentFiber().cancel_thread, .acquire) == Thread.canceling;
}
fn createFile(
userdata: ?*anyopaque,
dir: Io.Dir,
sub_path: []const u8,
flags: Io.File.CreateFlags,
) Io.File.OpenError!Io.File {
const el: *EventLoop = @ptrCast(@alignCast(userdata));
const thread: *Thread = .current();
const iou = &thread.io_uring;
const fiber = thread.currentFiber();
try fiber.enterCancelRegion(thread);
const posix = std.posix;
const sub_path_c = try posix.toPosixPath(sub_path);
var os_flags: posix.O = .{
.ACCMODE = if (flags.read) .RDWR else .WRONLY,
.CREAT = true,
.TRUNC = flags.truncate,
.EXCL = flags.exclusive,
};
if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true;
if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true;
// Use the O locking flags if the os supports them to acquire the lock
// atomically. Note that the NONBLOCK flag is removed after the openat()
// call is successful.
const has_flock_open_flags = @hasField(posix.O, "EXLOCK");
if (has_flock_open_flags) switch (flags.lock) {
.none => {},
.shared => {
os_flags.SHLOCK = true;
os_flags.NONBLOCK = flags.lock_nonblocking;
},
.exclusive => {
os_flags.EXLOCK = true;
os_flags.NONBLOCK = flags.lock_nonblocking;
},
};
const have_flock = @TypeOf(posix.system.flock) != void;
if (have_flock and !has_flock_open_flags and flags.lock != .none) {
@panic("TODO");
}
if (has_flock_open_flags and flags.lock_nonblocking) {
@panic("TODO");
}
getSqe(iou).* = .{
.opcode = .OPENAT,
.flags = 0,
.ioprio = 0,
.fd = dir.handle,
.off = 0,
.addr = @intFromPtr(&sub_path_c),
.len = @intCast(flags.mode),
.rw_flags = @bitCast(os_flags),
.user_data = @intFromPtr(fiber),
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
.addr3 = 0,
.resv = 0,
};
el.yield(null, .nothing);
fiber.exitCancelRegion(thread);
const completion = fiber.resultPointer(Completion);
switch (errno(completion.result)) {
.SUCCESS => return .{ .handle = completion.result },
.INTR => unreachable,
.CANCELED => return error.Canceled,
.FAULT => unreachable,
.INVAL => return error.BadPathName,
.BADF => unreachable,
.ACCES => return error.AccessDenied,
.FBIG => return error.FileTooBig,
.OVERFLOW => return error.FileTooBig,
.ISDIR => return error.IsDir,
.LOOP => return error.SymLinkLoop,
.MFILE => return error.ProcessFdQuotaExceeded,
.NAMETOOLONG => return error.NameTooLong,
.NFILE => return error.SystemFdQuotaExceeded,
.NODEV => return error.NoDevice,
.NOENT => return error.FileNotFound,
.NOMEM => return error.SystemResources,
.NOSPC => return error.NoSpaceLeft,
.NOTDIR => return error.NotDir,
.PERM => return error.PermissionDenied,
.EXIST => return error.PathAlreadyExists,
.BUSY => return error.DeviceBusy,
.OPNOTSUPP => return error.FileLocksNotSupported,
.AGAIN => return error.WouldBlock,
.TXTBSY => return error.FileBusy,
.NXIO => return error.NoDevice,
else => |err| return posix.unexpectedErrno(err),
}
}
fn fileOpen(
userdata: ?*anyopaque,
dir: Io.Dir,
sub_path: []const u8,
flags: Io.File.OpenFlags,
) Io.File.OpenError!Io.File {
const el: *EventLoop = @ptrCast(@alignCast(userdata));
const thread: *Thread = .current();
const iou = &thread.io_uring;
const fiber = thread.currentFiber();
try fiber.enterCancelRegion(thread);
const posix = std.posix;
const sub_path_c = try posix.toPosixPath(sub_path);
var os_flags: posix.O = .{
.ACCMODE = switch (flags.mode) {
.read_only => .RDONLY,
.write_only => .WRONLY,
.read_write => .RDWR,
},
};
if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true;
if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true;
if (@hasField(posix.O, "NOCTTY")) os_flags.NOCTTY = !flags.allow_ctty;
// Use the O locking flags if the os supports them to acquire the lock
// atomically.
const has_flock_open_flags = @hasField(posix.O, "EXLOCK");
if (has_flock_open_flags) {
// Note that the NONBLOCK flag is removed after the openat() call
// is successful.
switch (flags.lock) {
.none => {},
.shared => {
os_flags.SHLOCK = true;
os_flags.NONBLOCK = flags.lock_nonblocking;
},
.exclusive => {
os_flags.EXLOCK = true;
os_flags.NONBLOCK = flags.lock_nonblocking;
},
}
}
const have_flock = @TypeOf(posix.system.flock) != void;
if (have_flock and !has_flock_open_flags and flags.lock != .none) {
@panic("TODO");
}
if (has_flock_open_flags and flags.lock_nonblocking) {
@panic("TODO");
}
getSqe(iou).* = .{
.opcode = .OPENAT,
.flags = 0,
.ioprio = 0,
.fd = dir.handle,
.off = 0,
.addr = @intFromPtr(&sub_path_c),
.len = 0,
.rw_flags = @bitCast(os_flags),
.user_data = @intFromPtr(fiber),
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
.addr3 = 0,
.resv = 0,
};
el.yield(null, .nothing);
fiber.exitCancelRegion(thread);
const completion = fiber.resultPointer(Completion);
switch (errno(completion.result)) {
.SUCCESS => return .{ .handle = completion.result },
.INTR => unreachable,
.CANCELED => return error.Canceled,
.FAULT => unreachable,
.INVAL => return error.BadPathName,
.BADF => unreachable,
.ACCES => return error.AccessDenied,
.FBIG => return error.FileTooBig,
.OVERFLOW => return error.FileTooBig,
.ISDIR => return error.IsDir,
.LOOP => return error.SymLinkLoop,
.MFILE => return error.ProcessFdQuotaExceeded,
.NAMETOOLONG => return error.NameTooLong,
.NFILE => return error.SystemFdQuotaExceeded,
.NODEV => return error.NoDevice,
.NOENT => return error.FileNotFound,
.NOMEM => return error.SystemResources,
.NOSPC => return error.NoSpaceLeft,
.NOTDIR => return error.NotDir,
.PERM => return error.PermissionDenied,
.EXIST => return error.PathAlreadyExists,
.BUSY => return error.DeviceBusy,
.OPNOTSUPP => return error.FileLocksNotSupported,
.AGAIN => return error.WouldBlock,
.TXTBSY => return error.FileBusy,
.NXIO => return error.NoDevice,
else => |err| return posix.unexpectedErrno(err),
}
}
fn fileClose(userdata: ?*anyopaque, file: Io.File) void {
const el: *EventLoop = @ptrCast(@alignCast(userdata));
const thread: *Thread = .current();
const iou = &thread.io_uring;
const fiber = thread.currentFiber();
getSqe(iou).* = .{
.opcode = .CLOSE,
.flags = 0,
.ioprio = 0,
.fd = file.handle,
.off = 0,
.addr = 0,
.len = 0,
.rw_flags = 0,
.user_data = @intFromPtr(fiber),
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
.addr3 = 0,
.resv = 0,
};
el.yield(null, .nothing);
const completion = fiber.resultPointer(Completion);
switch (errno(completion.result)) {
.SUCCESS => return,
.INTR => unreachable,
.CANCELED => return,
.BADF => unreachable, // Always a race condition.
else => return,
}
}
fn pread(userdata: ?*anyopaque, file: Io.File, buffer: []u8, offset: std.posix.off_t) Io.File.PReadError!usize {
const el: *EventLoop = @ptrCast(@alignCast(userdata));
const thread: *Thread = .current();
const iou = &thread.io_uring;
const fiber = thread.currentFiber();
try fiber.enterCancelRegion(thread);
getSqe(iou).* = .{
.opcode = .READ,
.flags = 0,
.ioprio = 0,
.fd = file.handle,
.off = @bitCast(offset),
.addr = @intFromPtr(buffer.ptr),
.len = @min(buffer.len, 0x7ffff000),
.rw_flags = 0,
.user_data = @intFromPtr(fiber),
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
.addr3 = 0,
.resv = 0,
};
el.yield(null, .nothing);
fiber.exitCancelRegion(thread);
const completion = fiber.resultPointer(Completion);
switch (errno(completion.result)) {
.SUCCESS => return @as(u32, @bitCast(completion.result)),
.INTR => unreachable,
.CANCELED => return error.Canceled,
.INVAL => unreachable,
.FAULT => unreachable,
.NOENT => return error.ProcessNotFound,
.AGAIN => return error.WouldBlock,
.BADF => return error.NotOpenForReading, // Can be a race condition.
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
.NXIO => return error.Unseekable,
.SPIPE => return error.Unseekable,
.OVERFLOW => return error.Unseekable,
else => |err| return std.posix.unexpectedErrno(err),
}
}
fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: std.posix.off_t) Io.File.PWriteError!usize {
const el: *EventLoop = @ptrCast(@alignCast(userdata));
const thread: *Thread = .current();
const iou = &thread.io_uring;
const fiber = thread.currentFiber();
try fiber.enterCancelRegion(thread);
getSqe(iou).* = .{
.opcode = .WRITE,
.flags = 0,
.ioprio = 0,
.fd = file.handle,
.off = @bitCast(offset),
.addr = @intFromPtr(buffer.ptr),
.len = @min(buffer.len, 0x7ffff000),
.rw_flags = 0,
.user_data = @intFromPtr(fiber),
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
.addr3 = 0,
.resv = 0,
};
el.yield(null, .nothing);
fiber.exitCancelRegion(thread);
const completion = fiber.resultPointer(Completion);
switch (errno(completion.result)) {
.SUCCESS => return @as(u32, @bitCast(completion.result)),
.INTR => unreachable,
.CANCELED => return error.Canceled,
.INVAL => return error.InvalidArgument,
.FAULT => unreachable,
.NOENT => return error.ProcessNotFound,
.AGAIN => return error.WouldBlock,
.BADF => return error.NotOpenForWriting, // can be a race condition.
.DESTADDRREQ => unreachable, // `connect` was never called.
.DQUOT => return error.DiskQuota,
.FBIG => return error.FileTooBig,
.IO => return error.InputOutput,
.NOSPC => return error.NoSpaceLeft,
.ACCES => return error.AccessDenied,
.PERM => return error.PermissionDenied,
.PIPE => return error.BrokenPipe,
.NXIO => return error.Unseekable,
.SPIPE => return error.Unseekable,
.OVERFLOW => return error.Unseekable,
.BUSY => return error.DeviceBusy,
.CONNRESET => return error.ConnectionResetByPeer,
.MSGSIZE => return error.MessageTooBig,
else => |err| return std.posix.unexpectedErrno(err),
}
}
fn now(userdata: ?*anyopaque, clockid: std.posix.clockid_t) Io.ClockGetTimeError!Io.Timestamp {
_ = userdata;
const timespec = try std.posix.clock_gettime(clockid);
return @enumFromInt(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec);
}
fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void {
const el: *EventLoop = @ptrCast(@alignCast(userdata));
const thread: *Thread = .current();
const iou = &thread.io_uring;
const fiber = thread.currentFiber();
try fiber.enterCancelRegion(thread);
const deadline_nanoseconds: i96 = switch (deadline) {
.duration => |duration| duration.nanoseconds,
.timestamp => |timestamp| @intFromEnum(timestamp),
};
const timespec: std.os.linux.kernel_timespec = .{
.sec = @intCast(@divFloor(deadline_nanoseconds, std.time.ns_per_s)),
.nsec = @intCast(@mod(deadline_nanoseconds, std.time.ns_per_s)),
};
getSqe(iou).* = .{
.opcode = .TIMEOUT,
.flags = 0,
.ioprio = 0,
.fd = 0,
.off = 0,
.addr = @intFromPtr(&timespec),
.len = 1,
.rw_flags = @as(u32, switch (deadline) {
.duration => 0,
.timestamp => std.os.linux.IORING_TIMEOUT_ABS,
}) | @as(u32, switch (clockid) {
.REALTIME => std.os.linux.IORING_TIMEOUT_REALTIME,
.MONOTONIC => 0,
.BOOTTIME => std.os.linux.IORING_TIMEOUT_BOOTTIME,
else => return error.UnsupportedClock,
}),
.user_data = @intFromPtr(fiber),
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
.addr3 = 0,
.resv = 0,
};
el.yield(null, .nothing);
fiber.exitCancelRegion(thread);
const completion = fiber.resultPointer(Completion);
switch (errno(completion.result)) {
.SUCCESS, .TIME => return,
.INTR => unreachable,
.CANCELED => return error.Canceled,
else => |err| return std.posix.unexpectedErrno(err),
}
}
fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) error{Canceled}!void {
const el: *EventLoop = @ptrCast(@alignCast(userdata));
el.yield(null, .{ .mutex_lock = .{ .prev_state = prev_state, .mutex = mutex } });
}
fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
var maybe_waiting_fiber: ?*Fiber = @ptrFromInt(@intFromEnum(prev_state));
while (if (maybe_waiting_fiber) |waiting_fiber| @cmpxchgWeak(
Io.Mutex.State,
&mutex.state,
@enumFromInt(@intFromPtr(waiting_fiber)),
@enumFromInt(@intFromPtr(waiting_fiber.queue_next)),
.release,
.acquire,
) else @cmpxchgWeak(
Io.Mutex.State,
&mutex.state,
.locked_once,
.unlocked,
.release,
.acquire,
) orelse return) |next_state| maybe_waiting_fiber = @ptrFromInt(@intFromEnum(next_state));
maybe_waiting_fiber.?.queue_next = null;
const el: *EventLoop = @ptrCast(@alignCast(userdata));
el.yield(maybe_waiting_fiber.?, .reschedule);
}
const ConditionImpl = struct {
tail: *Fiber,
event: union(enum) {
queued,
wake: Io.Condition.Wake,
},
};
fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
const el: *EventLoop = @ptrCast(@alignCast(userdata));
el.yield(null, .{ .condition_wait = .{ .cond = cond, .mutex = mutex } });
const thread = Thread.current();
const fiber = thread.currentFiber();
const cond_impl = fiber.resultPointer(ConditionImpl);
try mutex.lock(el.io());
switch (cond_impl.event) {
.queued => {},
.wake => |wake| if (fiber.queue_next) |next_fiber| switch (wake) {
.one => if (@cmpxchgStrong(
?*Fiber,
@as(*?*Fiber, @ptrCast(&cond.state)),
null,
next_fiber,
.release,
.acquire,
)) |old_fiber| {
const old_cond_impl = old_fiber.?.resultPointer(ConditionImpl);
assert(old_cond_impl.tail.queue_next == null);
old_cond_impl.tail.queue_next = next_fiber;
old_cond_impl.tail = cond_impl.tail;
},
.all => el.schedule(thread, .{ .head = next_fiber, .tail = cond_impl.tail }),
},
}
fiber.queue_next = null;
}
fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void {
const el: *EventLoop = @ptrCast(@alignCast(userdata));
const waiting_fiber = @atomicRmw(?*Fiber, @as(*?*Fiber, @ptrCast(&cond.state)), .Xchg, null, .acquire) orelse return;
waiting_fiber.resultPointer(ConditionImpl).event = .{ .wake = wake };
el.yield(waiting_fiber, .reschedule);
}
fn errno(signed: i32) std.os.linux.E {
return .init(@bitCast(@as(isize, signed)));
}
fn getSqe(iou: *IoUring) *std.os.linux.io_uring_sqe {
while (true) return iou.get_sqe() catch {
_ = iou.submit_and_wait(0) catch |err| switch (err) {
error.SignalInterrupt => std.log.warn("submit_and_wait failed with SignalInterrupt", .{}),
else => |e| @panic(@errorName(e)),
};
continue;
};
}