EventLoop: get file operations working

Something is horribly wrong with scheduling, as can be seen in the
debug output, but at least it somehow manages to exit cleanly...
This commit is contained in:
Jacob Young 2025-03-29 02:31:27 -04:00 committed by Andrew Kelley
parent 50724cf1c3
commit 4f214b97ec

View file

@ -9,26 +9,25 @@ const IoUring = std.os.linux.IoUring;
gpa: Allocator, gpa: Allocator,
mutex: std.Thread.Mutex, mutex: std.Thread.Mutex,
cond: std.Thread.Condition,
queue: std.DoublyLinkedList(void), queue: std.DoublyLinkedList(void),
/// Atomic copy of queue.len
queue_len: usize,
free: std.DoublyLinkedList(void), free: std.DoublyLinkedList(void),
main_context: Context, main_fiber: Fiber,
exit_awaiter: ?*Fiber, idle_count: usize,
threads: std.ArrayListUnmanaged(Thread), threads: std.ArrayListUnmanaged(Thread),
/// 1 bit per thread, same order as `thread_index`. exiting: bool,
idle_iourings: []usize,
threadlocal var thread_index: u32 = undefined; threadlocal var thread_index: u32 = undefined;
/// Empirically saw 10KB being used by the self-hosted backend for logging. /// Empirically saw 10KB being used by the self-hosted backend for logging.
const idle_stack_size = 32 * 1024; const idle_stack_size = 64 * 1024;
const io_uring_entries = 64; const io_uring_entries = 64;
const Thread = struct { const Thread = struct {
thread: std.Thread, thread: std.Thread,
idle_context: Context, idle_context: Context,
current_idle_context: *Context,
current_context: *Context, current_context: *Context,
io_uring: IoUring, io_uring: IoUring,
@ -103,98 +102,92 @@ pub fn io(el: *EventLoop) Io {
} }
pub fn init(el: *EventLoop, gpa: Allocator) !void { pub fn init(el: *EventLoop, gpa: Allocator) !void {
const n_threads: usize = @max((std.Thread.getCpuCount() catch 1), 1); const threads_size = @max(std.Thread.getCpuCount() catch 1, 1) * @sizeOf(Thread);
const threads_bytes = n_threads * @sizeOf(Thread); const idle_stack_end_offset = std.mem.alignForward(usize, threads_size + idle_stack_size, std.heap.page_size_max);
const idle_context_offset = std.mem.alignForward(usize, threads_bytes, @alignOf(Context)); const allocated_slice = try gpa.alignedAlloc(u8, @alignOf(Thread), idle_stack_end_offset);
const idle_stack_end_offset = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max);
const allocated_slice = try gpa.alignedAlloc(u8, @max(@alignOf(Thread), @alignOf(Context)), idle_stack_end_offset);
errdefer gpa.free(allocated_slice); errdefer gpa.free(allocated_slice);
const idle_iourings = try gpa.alloc(usize, (n_threads + @bitSizeOf(usize) - 1) / @bitSizeOf(usize));
errdefer gpa.free(idle_iourings);
@memset(idle_iourings, 0);
el.* = .{ el.* = .{
.gpa = gpa, .gpa = gpa,
.mutex = .{}, .mutex = .{},
.cond = .{},
.queue = .{}, .queue = .{},
.queue_len = 0,
.free = .{}, .free = .{},
.main_context = undefined, .main_fiber = undefined,
.exit_awaiter = null, .idle_count = 0,
.threads = .initBuffer(@ptrCast(allocated_slice[0..threads_bytes])), .threads = .initBuffer(@ptrCast(allocated_slice[0..threads_size])),
.idle_iourings = idle_iourings, .exiting = false,
}; };
thread_index = 0;
const main_thread = el.threads.addOneAssumeCapacity(); const main_thread = el.threads.addOneAssumeCapacity();
main_thread.io_uring = try IoUring.init(io_uring_entries, 0); main_thread.io_uring = try IoUring.init(io_uring_entries, 0);
const main_idle_context: *Context = @alignCast(std.mem.bytesAsValue(Context, allocated_slice[idle_context_offset..][0..@sizeOf(Context)])); const idle_stack_end: [*]usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr));
const idle_stack_end: [*]align(@max(@alignOf(Thread), @alignOf(Context))) usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr));
(idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)}; (idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)};
main_idle_context.* = .{ main_thread.idle_context = .{
.rsp = @intFromPtr(idle_stack_end - 1), .rsp = @intFromPtr(idle_stack_end - 1),
.rbp = 0, .rbp = 0,
.rip = @intFromPtr(&mainIdleEntry), .rip = @intFromPtr(&mainIdleEntry),
}; };
std.log.debug("created main idle {*}", .{main_idle_context}); std.log.debug("created main idle {*}", .{&main_thread.idle_context});
main_thread.current_idle_context = main_idle_context; std.log.debug("created main {*}", .{&el.main_fiber});
std.log.debug("created main {*}", .{&el.main_context}); main_thread.current_context = &el.main_fiber.context;
main_thread.current_context = &el.main_context;
} }
pub fn deinit(el: *EventLoop) void { pub fn deinit(el: *EventLoop) void {
assert(el.queue.len == 0); // pending async assert(el.queue.len == 0); // pending async
el.yield(null, &el.exit_awaiter); el.yield(null, .exit);
while (el.free.pop()) |free_node| { while (el.free.pop()) |free_node| {
const free_fiber: *Fiber = @alignCast(@fieldParentPtr("queue_node", free_node)); const free_fiber: *Fiber = @alignCast(@fieldParentPtr("queue_node", free_node));
el.gpa.free(free_fiber.allocatedSlice()); el.gpa.free(free_fiber.allocatedSlice());
} }
const idle_context_offset = std.mem.alignForward(usize, el.threads.capacity * @sizeOf(Thread), @alignOf(Context)); const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.capacity * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max);
const idle_stack_end = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max); const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @alignCast(@ptrCast(el.threads.items.ptr));
const allocated_ptr: [*]align(@max(@alignOf(Thread), @alignOf(Context))) u8 = @alignCast(@ptrCast(el.threads.items.ptr));
for (el.threads.items[1..]) |*thread| thread.thread.join(); for (el.threads.items[1..]) |*thread| thread.thread.join();
el.gpa.free(allocated_ptr[0..idle_stack_end]); el.gpa.free(allocated_ptr[0..idle_stack_end_offset]);
} }
const PendingTask = union(enum) { fn yield(el: *EventLoop, optional_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void {
none,
register_awaiter: *?*Fiber,
io_uring_submit: *IoUring,
};
fn yield(el: *EventLoop, optional_fiber: ?*Fiber, pending_task: PendingTask) void {
const thread: *Thread = &el.threads.items[thread_index]; const thread: *Thread = &el.threads.items[thread_index];
const ready_context: *Context = ready_context: { const ready_context: *Context = ready_context: {
const ready_fiber: *Fiber = optional_fiber orelse if (ready_node: { const ready_fiber: *Fiber = optional_fiber orelse if (ready_node: {
el.mutex.lock(); el.mutex.lock();
defer el.mutex.unlock(); defer el.mutex.unlock();
break :ready_node el.queue.pop(); const ready_node = el.queue.pop();
@atomicStore(usize, &el.queue_len, el.queue.len, .unordered);
break :ready_node ready_node;
}) |ready_node| }) |ready_node|
@alignCast(@fieldParentPtr("queue_node", ready_node)) @alignCast(@fieldParentPtr("queue_node", ready_node))
else else
break :ready_context thread.current_idle_context; break :ready_context &thread.idle_context;
break :ready_context &ready_fiber.context; break :ready_context &ready_fiber.context;
}; };
const message: SwitchMessage = .{ const message: SwitchMessage = .{
.prev_context = thread.current_context, .contexts = .{
.ready_context = ready_context, .prev = thread.current_context,
.ready = ready_context,
},
.pending_task = pending_task, .pending_task = pending_task,
}; };
std.log.debug("switching from {*} to {*}", .{ message.prev_context, message.ready_context }); std.log.debug("switching from {*} to {*}", .{ message.contexts.prev, message.contexts.ready });
contextSwitch(&message).handle(el); contextSwitch(&message).handle(el);
} }
fn schedule(el: *EventLoop, fiber: *Fiber) void { fn schedule(el: *EventLoop, fiber: *Fiber) void {
el.mutex.lock(); if (idle_count: {
el.queue.append(&fiber.queue_node); el.mutex.lock();
//for (el.idle_iourings) |*int| { defer el.mutex.unlock();
// const idler_subset = @atomicLoad(usize, int, .unordered); el.queue.append(&fiber.queue_node);
// if (idler_subset == 0) continue; @atomicStore(usize, &el.queue_len, el.queue.len, .unordered);
// break :idle_count el.idle_count;
//} } > 0) {
if (el.idle_count > 0) { _ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(usize), 1, switch (@bitSizeOf(usize)) {
el.mutex.unlock(); 8 => std.os.linux.FUTEX2.SIZE_U8,
el.cond.signal(); 16 => std.os.linux.FUTEX2.SIZE_U16,
32 => std.os.linux.FUTEX2.SIZE_U32,
64 => std.os.linux.FUTEX2.SIZE_U64,
else => @compileError("unsupported @sizeOf(usize)"),
} | std.os.linux.FUTEX2.PRIVATE); // TODO: io_uring
return; return;
} }
defer el.mutex.unlock();
if (el.threads.items.len == el.threads.capacity) return; if (el.threads.items.len == el.threads.capacity) return;
const thread = el.threads.addOneAssumeCapacity(); const thread = el.threads.addOneAssumeCapacity();
thread.thread = std.Thread.spawn(.{ thread.thread = std.Thread.spawn(.{
@ -216,64 +209,101 @@ fn recycle(el: *EventLoop, fiber: *Fiber) void {
fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn { fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn {
message.handle(el); message.handle(el);
el.yield(el.idle(), null); el.idle();
el.yield(&el.main_fiber, .nothing);
unreachable; // switched to dead fiber unreachable; // switched to dead fiber
} }
fn threadEntry(el: *EventLoop, index: usize) void { fn threadEntry(el: *EventLoop, index: usize) void {
thread_index = index; thread_index = @intCast(index);
const thread: *Thread = &el.threads.items[index]; const thread: *Thread = &el.threads.items[index];
std.log.debug("created thread idle {*}", .{&thread.idle_context}); std.log.debug("created thread idle {*}", .{&thread.idle_context});
thread.io_uring = IoUring.init(io_uring_entries, 0) catch |err| { thread.io_uring = IoUring.init(io_uring_entries, 0) catch |err| {
std.log.warn("exiting worker thread during init due to io_uring init failure: {s}", .{@errorName(err)}); std.log.warn("exiting worker thread during init due to io_uring init failure: {s}", .{@errorName(err)});
return; return;
}; };
thread.current_idle_context = &thread.idle_context;
thread.current_context = &thread.idle_context; thread.current_context = &thread.idle_context;
_ = el.idle(); el.idle();
} }
fn idle(el: *EventLoop) *Fiber { const UserData = enum(u64) {
queue_len_futex_wait,
_,
};
fn idle(el: *EventLoop) void {
const thread: *Thread = &el.threads.items[thread_index]; const thread: *Thread = &el.threads.items[thread_index];
// The idle fiber only runs on one thread.
const iou = &thread.io_uring; const iou = &thread.io_uring;
var cqes_buffer: [io_uring_entries]std.os.linux.io_uring_cqe = undefined; var cqes_buffer: [io_uring_entries]std.os.linux.io_uring_cqe = undefined;
var futex_is_scheduled: bool = false;
while (true) { while (true) {
el.yield(null, null); el.yield(null, .nothing);
if (@atomicLoad(?*Fiber, &el.exit_awaiter, .acquire)) |exit_awaiter| { if (@atomicLoad(bool, &el.exiting, .acquire)) return;
el.cond.broadcast(); if (!futex_is_scheduled) {
return exit_awaiter; const sqe = getSqe(&thread.io_uring);
} sqe.prep_rw(.FUTEX_WAIT, switch (@bitSizeOf(usize)) {
// TODO add uring to bit set 8 => std.os.linux.FUTEX2.SIZE_U8,
const n = iou.copy_cqes(&cqes_buffer, 1) catch @panic("TODO handle copy_cqes error"); 16 => std.os.linux.FUTEX2.SIZE_U16,
const cqes = cqes_buffer[0..n]; 32 => std.os.linux.FUTEX2.SIZE_U32,
for (cqes) |cqe| { 64 => std.os.linux.FUTEX2.SIZE_U64,
const fiber: *Fiber = @ptrFromInt(cqe.user_data); else => @compileError("unsupported @sizeOf(usize)"),
const res: *i32 = @ptrCast(@alignCast(fiber.resultPointer())); } | std.os.linux.FUTEX2.PRIVATE, @intFromPtr(&el.queue_len), 0, 0);
res.* = cqe.res; sqe.addr3 = std.math.maxInt(u64);
el.schedule(fiber); sqe.user_data = @intFromEnum(UserData.queue_len_futex_wait);
futex_is_scheduled = true;
} }
_ = iou.submit_and_wait(1) catch |err| switch (err) {
error.SignalInterrupt => 0,
else => @panic(@errorName(err)),
};
for (cqes_buffer[0 .. iou.copy_cqes(&cqes_buffer, 1) catch |err| switch (err) {
error.SignalInterrupt => 0,
else => @panic(@errorName(err)),
}]) |cqe| switch (@as(UserData, @enumFromInt(cqe.user_data))) {
.queue_len_futex_wait => futex_is_scheduled = false,
_ => {
const fiber: *Fiber = @ptrFromInt(cqe.user_data);
const res: *i32 = @ptrCast(@alignCast(fiber.resultPointer()));
res.* = cqe.res;
el.schedule(fiber);
},
};
} }
} }
const SwitchMessage = extern struct { const SwitchMessage = struct {
prev_context: *Context, contexts: extern struct {
ready_context: *Context, prev: *Context,
ready: *Context,
},
pending_task: PendingTask, pending_task: PendingTask,
const PendingTask = union(enum) {
nothing,
register_awaiter: *?*Fiber,
exit,
};
fn handle(message: *const SwitchMessage, el: *EventLoop) void { fn handle(message: *const SwitchMessage, el: *EventLoop) void {
const thread: *Thread = &el.threads.items[thread_index]; const thread: *Thread = &el.threads.items[thread_index];
thread.current_context = message.ready_context; thread.current_context = message.contexts.ready;
switch (message.pending_task) { switch (message.pending_task) {
.none => {}, .nothing => {},
.register_awaiter => |awaiter| { .register_awaiter => |awaiter| {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.prev_context)); const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber); if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber);
}, },
.io_uring_submit => |iou| { .exit => {
_ = iou.flush_sq(); @atomicStore(bool, &el.exiting, true, .unordered);
// TODO: determine whether this return value should be used @atomicStore(usize, &el.queue_len, std.math.maxInt(usize), .release);
_ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(usize), std.math.maxInt(i32), switch (@bitSizeOf(usize)) {
8 => std.os.linux.FUTEX2.SIZE_U8,
16 => std.os.linux.FUTEX2.SIZE_U16,
32 => std.os.linux.FUTEX2.SIZE_U32,
64 => std.os.linux.FUTEX2.SIZE_U64,
else => @compileError("unsupported @sizeOf(usize)"),
} | std.os.linux.FUTEX2.PRIVATE); // TODO: use io_uring
}, },
} }
} }
@ -289,7 +319,7 @@ const Context = switch (builtin.cpu.arch) {
}; };
inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage { inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage {
return switch (builtin.cpu.arch) { return @fieldParentPtr("contexts", switch (builtin.cpu.arch) {
.x86_64 => asm volatile ( .x86_64 => asm volatile (
\\ movq 0(%%rsi), %%rax \\ movq 0(%%rsi), %%rax
\\ movq 8(%%rsi), %%rcx \\ movq 8(%%rsi), %%rcx
@ -301,8 +331,8 @@ inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage {
\\ movq 8(%%rcx), %%rbp \\ movq 8(%%rcx), %%rbp
\\ jmpq *16(%%rcx) \\ jmpq *16(%%rcx)
\\0: \\0:
: [received_message] "={rsi}" (-> *const SwitchMessage), : [received_message] "={rsi}" (-> *const @FieldType(SwitchMessage, "contexts")),
: [message_to_send] "{rsi}" (message), : [message_to_send] "{rsi}" (&message.contexts),
: "rax", "rcx", "rdx", "rbx", "rdi", // : "rax", "rcx", "rdx", "rbx", "rdi", //
"r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15", // "r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15", //
"mm0", "mm1", "mm2", "mm3", "mm4", "mm5", "mm6", "mm7", // "mm0", "mm1", "mm2", "mm3", "mm4", "mm5", "mm6", "mm7", //
@ -313,7 +343,7 @@ inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage {
"fpsr", "fpcr", "mxcsr", "rflags", "dirflag", "memory" "fpsr", "fpcr", "mxcsr", "rflags", "dirflag", "memory"
), ),
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
}; });
} }
fn mainIdleEntry() callconv(.naked) void { fn mainIdleEntry() callconv(.naked) void {
@ -401,7 +431,7 @@ const AsyncClosure = struct {
std.log.debug("{*} performing async", .{closure.fiber}); std.log.debug("{*} performing async", .{closure.fiber});
closure.start(closure.contextPointer(), closure.fiber.resultPointer()); closure.start(closure.contextPointer(), closure.fiber.resultPointer());
const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel); const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
closure.event_loop.yield(awaiter, null); closure.event_loop.yield(awaiter, .nothing);
unreachable; // switched to dead fiber unreachable; // switched to dead fiber
} }
}; };
@ -409,17 +439,93 @@ const AsyncClosure = struct {
pub fn @"await"(userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, result: []u8) void { pub fn @"await"(userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, result: []u8) void {
const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
const future_fiber: *Fiber = @alignCast(@ptrCast(any_future)); const future_fiber: *Fiber = @alignCast(@ptrCast(any_future));
if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, &future_fiber.awaiter); if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
@memcpy(result, future_fiber.resultPointer()); @memcpy(result, future_fiber.resultPointer());
event_loop.recycle(future_fiber); event_loop.recycle(future_fiber);
} }
pub fn createFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, flags: std.fs.File.CreateFlags) std.fs.File.OpenError!std.fs.File { pub fn createFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, flags: std.fs.File.CreateFlags) std.fs.File.OpenError!std.fs.File {
_ = userdata; const el: *EventLoop = @ptrCast(@alignCast(userdata));
_ = dir;
_ = sub_path; const posix = std.posix;
_ = flags; const sub_path_c = try posix.toPosixPath(sub_path);
@panic("TODO");
var os_flags: posix.O = .{
.ACCMODE = if (flags.read) .RDWR else .WRONLY,
.CREAT = true,
.TRUNC = flags.truncate,
.EXCL = flags.exclusive,
};
if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true;
if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true;
// Use the O locking flags if the os supports them to acquire the lock
// atomically. Note that the NONBLOCK flag is removed after the openat()
// call is successful.
const has_flock_open_flags = @hasField(posix.O, "EXLOCK");
if (has_flock_open_flags) switch (flags.lock) {
.none => {},
.shared => {
os_flags.SHLOCK = true;
os_flags.NONBLOCK = flags.lock_nonblocking;
},
.exclusive => {
os_flags.EXLOCK = true;
os_flags.NONBLOCK = flags.lock_nonblocking;
},
};
const have_flock = @TypeOf(posix.system.flock) != void;
if (have_flock and !has_flock_open_flags and flags.lock != .none) {
@panic("TODO");
}
if (has_flock_open_flags and flags.lock_nonblocking) {
@panic("TODO");
}
const thread: *Thread = &el.threads.items[thread_index];
const iou = &thread.io_uring;
const sqe = getSqe(iou);
const fiber = thread.currentFiber();
sqe.prep_openat(dir.fd, &sub_path_c, os_flags, flags.mode);
sqe.user_data = @intFromPtr(fiber);
el.yield(null, .nothing);
const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)]));
const rc = result.*;
switch (errno(rc)) {
.SUCCESS => return .{ .handle = rc },
.INTR => @panic("TODO is this reachable?"),
.CANCELED => @panic("TODO figure out how this error code fits into things"),
.FAULT => unreachable,
.INVAL => return error.BadPathName,
.BADF => unreachable,
.ACCES => return error.AccessDenied,
.FBIG => return error.FileTooBig,
.OVERFLOW => return error.FileTooBig,
.ISDIR => return error.IsDir,
.LOOP => return error.SymLinkLoop,
.MFILE => return error.ProcessFdQuotaExceeded,
.NAMETOOLONG => return error.NameTooLong,
.NFILE => return error.SystemFdQuotaExceeded,
.NODEV => return error.NoDevice,
.NOENT => return error.FileNotFound,
.NOMEM => return error.SystemResources,
.NOSPC => return error.NoSpaceLeft,
.NOTDIR => return error.NotDir,
.PERM => return error.PermissionDenied,
.EXIST => return error.PathAlreadyExists,
.BUSY => return error.DeviceBusy,
.OPNOTSUPP => return error.FileLocksNotSupported,
.AGAIN => return error.WouldBlock,
.TXTBSY => return error.FileBusy,
.NXIO => return error.NoDevice,
else => |err| return posix.unexpectedErrno(err),
}
} }
pub fn openFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, flags: std.fs.File.OpenFlags) std.fs.File.OpenError!std.fs.File { pub fn openFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, flags: std.fs.File.OpenFlags) std.fs.File.OpenError!std.fs.File {
@ -476,7 +582,7 @@ pub fn openFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, fl
sqe.prep_openat(dir.fd, &sub_path_c, os_flags, 0); sqe.prep_openat(dir.fd, &sub_path_c, os_flags, 0);
sqe.user_data = @intFromPtr(fiber); sqe.user_data = @intFromPtr(fiber);
el.yield(null, .{ .io_uring_submit = iou }); el.yield(null, .nothing);
const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)])); const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)]));
const rc = result.*; const rc = result.*;
@ -510,8 +616,6 @@ pub fn openFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, fl
.NXIO => return error.NoDevice, .NXIO => return error.NoDevice,
else => |err| return posix.unexpectedErrno(err), else => |err| return posix.unexpectedErrno(err),
} }
return .{ .handle = result.* };
} }
fn errno(signed: i32) std.posix.E { fn errno(signed: i32) std.posix.E {
@ -524,21 +628,109 @@ fn getSqe(iou: *IoUring) *std.os.linux.io_uring_sqe {
} }
pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void { pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void {
_ = userdata; const el: *EventLoop = @ptrCast(@alignCast(userdata));
_ = file;
@panic("TODO"); const posix = std.posix;
const thread: *Thread = &el.threads.items[thread_index];
const iou = &thread.io_uring;
const sqe = getSqe(iou);
const fiber = thread.currentFiber();
sqe.prep_close(file.handle);
sqe.user_data = @intFromPtr(fiber);
el.yield(null, .nothing);
const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)]));
const rc = result.*;
switch (errno(rc)) {
.SUCCESS => return,
.INTR => @panic("TODO is this reachable?"),
.CANCELED => @panic("TODO figure out how this error code fits into things"),
.BADF => unreachable, // Always a race condition.
else => return,
}
} }
pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) std.fs.File.ReadError!usize { pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) std.fs.File.ReadError!usize {
_ = userdata; const el: *EventLoop = @ptrCast(@alignCast(userdata));
_ = file;
_ = buffer; const posix = std.posix;
@panic("TODO");
const thread: *Thread = &el.threads.items[thread_index];
const iou = &thread.io_uring;
const sqe = getSqe(iou);
const fiber = thread.currentFiber();
sqe.prep_read(file.handle, buffer, std.math.maxInt(u64));
sqe.user_data = @intFromPtr(fiber);
el.yield(null, .nothing);
const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)]));
const rc = result.*;
switch (errno(rc)) {
.SUCCESS => return @as(u32, @bitCast(rc)),
.INTR => @panic("TODO is this reachable?"),
.CANCELED => @panic("TODO figure out how this error code fits into things"),
.INVAL => unreachable,
.FAULT => unreachable,
.NOENT => return error.ProcessNotFound,
.AGAIN => return error.WouldBlock,
.BADF => return error.NotOpenForReading, // Can be a race condition.
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
else => |err| return posix.unexpectedErrno(err),
}
} }
pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) std.fs.File.WriteError!usize { pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) std.fs.File.WriteError!usize {
_ = userdata; const el: *EventLoop = @ptrCast(@alignCast(userdata));
_ = file;
_ = buffer; const posix = std.posix;
@panic("TODO");
const thread: *Thread = &el.threads.items[thread_index];
const iou = &thread.io_uring;
const sqe = getSqe(iou);
const fiber = thread.currentFiber();
sqe.prep_write(file.handle, buffer, std.math.maxInt(u64));
sqe.user_data = @intFromPtr(fiber);
el.yield(null, .nothing);
const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)]));
const rc = result.*;
switch (errno(rc)) {
.SUCCESS => return @as(u32, @bitCast(rc)),
.INTR => @panic("TODO is this reachable?"),
.CANCELED => @panic("TODO figure out how this error code fits into things"),
.INVAL => return error.InvalidArgument,
.FAULT => unreachable,
.NOENT => return error.ProcessNotFound,
.AGAIN => return error.WouldBlock,
.BADF => return error.NotOpenForWriting, // can be a race condition.
.DESTADDRREQ => unreachable, // `connect` was never called.
.DQUOT => return error.DiskQuota,
.FBIG => return error.FileTooBig,
.IO => return error.InputOutput,
.NOSPC => return error.NoSpaceLeft,
.ACCES => return error.AccessDenied,
.PERM => return error.PermissionDenied,
.PIPE => return error.BrokenPipe,
.CONNRESET => return error.ConnectionResetByPeer,
.BUSY => return error.DeviceBusy,
.NXIO => return error.NoDevice,
.MSGSIZE => return error.MessageTooBig,
else => |err| return posix.unexpectedErrno(err),
}
} }