EventLoop: implement detached fibers

This commit is contained in:
Jacob Young 2025-04-01 03:45:31 -04:00 committed by Andrew Kelley
parent 5c4ddb8d35
commit b174777437

View file

@ -9,9 +9,12 @@ const IoUring = std.os.linux.IoUring;
/// Must be a thread-safe allocator. /// Must be a thread-safe allocator.
gpa: Allocator, gpa: Allocator,
mutex: std.Thread.Mutex,
main_fiber_buffer: [@sizeOf(Fiber) + Fiber.max_result_size]u8 align(@alignOf(Fiber)), main_fiber_buffer: [@sizeOf(Fiber) + Fiber.max_result_size]u8 align(@alignOf(Fiber)),
threads: Thread.List, threads: Thread.List,
detached: struct {
mutex: std.Io.Mutex,
list: std.DoublyLinkedList(void),
},
/// Empirically saw >128KB being used by the self-hosted backend to panic. /// Empirically saw >128KB being used by the self-hosted backend to panic.
const idle_stack_size = 256 * 1024; const idle_stack_size = 256 * 1024;
@ -167,13 +170,16 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
errdefer gpa.free(allocated_slice); errdefer gpa.free(allocated_slice);
el.* = .{ el.* = .{
.gpa = gpa, .gpa = gpa,
.mutex = .{},
.main_fiber_buffer = undefined, .main_fiber_buffer = undefined,
.threads = .{ .threads = .{
.allocated = @ptrCast(allocated_slice[0..threads_size]), .allocated = @ptrCast(allocated_slice[0..threads_size]),
.reserved = 1, .reserved = 1,
.active = 1, .active = 1,
}, },
.detached = .{
.mutex = .init,
.list = .{},
},
}; };
const main_fiber: *Fiber = @ptrCast(&el.main_fiber_buffer); const main_fiber: *Fiber = @ptrCast(&el.main_fiber_buffer);
main_fiber.* = .{ main_fiber.* = .{
@ -207,6 +213,23 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
} }
pub fn deinit(el: *EventLoop) void { pub fn deinit(el: *EventLoop) void {
while (true) cancel(el, detached_future: {
el.detached.mutex.lock(el.io()) catch |err| switch (err) {
error.Canceled => unreachable, // main fiber cannot be canceled
};
defer el.detached.mutex.unlock(el.io());
const detached: *DetachedClosure = @fieldParentPtr(
"detached_queue_node",
el.detached.list.pop() orelse break,
);
// notify the detached fiber that it is no longer allowed to recycle itself
detached.detached_queue_node = .{
.prev = &detached.detached_queue_node,
.next = &detached.detached_queue_node,
.data = {},
};
break :detached_future @ptrCast(detached.fiber);
}, &.{}, .@"1");
const active_threads = @atomicLoad(u32, &el.threads.active, .acquire); const active_threads = @atomicLoad(u32, &el.threads.active, .acquire);
for (el.threads.allocated[0..active_threads]) |*thread| { for (el.threads.allocated[0..active_threads]) |*thread| {
const ready_fiber = @atomicLoad(?*Fiber, &thread.ready_queue, .monotonic); const ready_fiber = @atomicLoad(?*Fiber, &thread.ready_queue, .monotonic);
@ -460,7 +483,7 @@ const SwitchMessage = struct {
const PendingTask = union(enum) { const PendingTask = union(enum) {
nothing, nothing,
reschedule, reschedule,
recycle: *Fiber, recycle,
register_awaiter: *?*Fiber, register_awaiter: *?*Fiber,
mutex_lock: struct { mutex_lock: struct {
prev_state: Io.Mutex.State, prev_state: Io.Mutex.State,
@ -483,8 +506,10 @@ const SwitchMessage = struct {
assert(prev_fiber.queue_next == null); assert(prev_fiber.queue_next == null);
el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
}, },
.recycle => |fiber| { .recycle => {
el.recycle(fiber); const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
assert(prev_fiber.queue_next == null);
el.recycle(prev_fiber);
}, },
.register_awaiter => |awaiter| { .register_awaiter => |awaiter| {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
@ -609,21 +634,7 @@ fn fiberEntry() callconv(.naked) void {
switch (builtin.cpu.arch) { switch (builtin.cpu.arch) {
.x86_64 => asm volatile ( .x86_64 => asm volatile (
\\ leaq 8(%%rsp), %%rdi \\ leaq 8(%%rsp), %%rdi
\\ jmp %[AsyncClosure_call:P] \\ jmpq *(%%rsp)
:
: [AsyncClosure_call] "X" (&AsyncClosure.call),
),
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
}
}
fn fiberEntryDetached() callconv(.naked) void {
switch (builtin.cpu.arch) {
.x86_64 => asm volatile (
\\ leaq 8(%%rsp), %%rdi
\\ jmp %[DetachedClosure_call:P]
:
: [DetachedClosure_call] "X" (&DetachedClosure.call),
), ),
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
} }
@ -649,29 +660,6 @@ const AsyncClosure = struct {
} }
}; };
const DetachedClosure = struct {
event_loop: *EventLoop,
fiber: *Fiber,
start: *const fn (context: *const anyopaque) void,
fn contextPointer(closure: *DetachedClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(DetachedClosure));
}
fn call(closure: *DetachedClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(DetachedClosure))) noreturn {
message.handle(closure.event_loop);
std.log.debug("{*} performing async detached", .{closure.fiber});
closure.start(closure.contextPointer());
const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
if (awaiter) |a| {
closure.event_loop.yield(a, .nothing);
} else {
closure.event_loop.yield(null, .{ .recycle = closure.fiber });
}
unreachable; // switched to dead fiber
}
};
fn @"async"( fn @"async"(
userdata: ?*anyopaque, userdata: ?*anyopaque,
result: []u8, result: []u8,
@ -695,11 +683,13 @@ fn @"async"(
const closure: *AsyncClosure = @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward( const closure: *AsyncClosure = @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward(
@intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size, @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size,
) - @sizeOf(AsyncClosure)); ) - @sizeOf(AsyncClosure));
const stack_end: [*]usize = @alignCast(@ptrCast(closure));
(stack_end - 1)[0..1].* = .{@intFromPtr(&AsyncClosure.call)};
fiber.* = .{ fiber.* = .{
.required_align = {}, .required_align = {},
.context = switch (builtin.cpu.arch) { .context = switch (builtin.cpu.arch) {
.x86_64 => .{ .x86_64 => .{
.rsp = @intFromPtr(closure) - @sizeOf(usize), .rsp = @intFromPtr(stack_end - 1),
.rbp = 0, .rbp = 0,
.rip = @intFromPtr(&fiberEntry), .rip = @intFromPtr(&fiberEntry),
}, },
@ -722,6 +712,34 @@ fn @"async"(
return @ptrCast(fiber); return @ptrCast(fiber);
} }
const DetachedClosure = struct {
event_loop: *EventLoop,
fiber: *Fiber,
start: *const fn (context: *const anyopaque) void,
detached_queue_node: std.DoublyLinkedList(void).Node,
fn contextPointer(closure: *DetachedClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(DetachedClosure));
}
fn call(closure: *DetachedClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(DetachedClosure))) noreturn {
message.handle(closure.event_loop);
std.log.debug("{*} performing async detached", .{closure.fiber});
closure.start(closure.contextPointer());
const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
closure.event_loop.yield(awaiter, pending_task: {
closure.event_loop.detached.mutex.lock(closure.event_loop.io()) catch |err| switch (err) {
error.Canceled => break :pending_task .nothing,
};
defer closure.event_loop.detached.mutex.unlock(closure.event_loop.io());
if (closure.detached_queue_node.next == &closure.detached_queue_node) break :pending_task .nothing;
closure.event_loop.detached.list.remove(&closure.detached_queue_node);
break :pending_task .recycle;
});
unreachable; // switched to dead fiber
}
};
fn go( fn go(
userdata: ?*anyopaque, userdata: ?*anyopaque,
context: []const u8, context: []const u8,
@ -742,13 +760,15 @@ fn go(
const closure: *DetachedClosure = @ptrFromInt(Fiber.max_context_align.max(.of(DetachedClosure)).backward( const closure: *DetachedClosure = @ptrFromInt(Fiber.max_context_align.max(.of(DetachedClosure)).backward(
@intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size, @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size,
) - @sizeOf(DetachedClosure)); ) - @sizeOf(DetachedClosure));
const stack_end: [*]usize = @alignCast(@ptrCast(closure));
(stack_end - 1)[0..1].* = .{@intFromPtr(&DetachedClosure.call)};
fiber.* = .{ fiber.* = .{
.required_align = {}, .required_align = {},
.context = switch (builtin.cpu.arch) { .context = switch (builtin.cpu.arch) {
.x86_64 => .{ .x86_64 => .{
.rsp = @intFromPtr(closure) - @sizeOf(usize), .rsp = @intFromPtr(stack_end - 1),
.rbp = 0, .rbp = 0,
.rip = @intFromPtr(&fiberEntryDetached), .rip = @intFromPtr(&fiberEntry),
}, },
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
}, },
@ -761,7 +781,19 @@ fn go(
.event_loop = event_loop, .event_loop = event_loop,
.fiber = fiber, .fiber = fiber,
.start = start, .start = start,
.detached_queue_node = .{ .data = {} },
}; };
{
event_loop.detached.mutex.lock(event_loop.io()) catch |err| switch (err) {
error.Canceled => {
event_loop.recycle(fiber);
start(context.ptr);
return;
},
};
defer event_loop.detached.mutex.unlock(event_loop.io());
event_loop.detached.list.append(&closure.detached_queue_node);
}
@memcpy(closure.contextPointer(), context); @memcpy(closure.contextPointer(), context);
event_loop.schedule(current_thread, .{ .head = fiber, .tail = fiber }); event_loop.schedule(current_thread, .{ .head = fiber, .tail = fiber });