EventLoop: prepare for threading

This commit is contained in:
Jacob Young 2025-03-27 01:49:01 -04:00 committed by Andrew Kelley
parent 07ee4977da
commit 93054125fe

View file

@ -5,6 +5,7 @@ const Io = std.Io;
const EventLoop = @This(); const EventLoop = @This();
gpa: Allocator, gpa: Allocator,
mutex: std.Thread.Mutex,
queue: std.DoublyLinkedList(void), queue: std.DoublyLinkedList(void),
free: std.DoublyLinkedList(void), free: std.DoublyLinkedList(void),
main_fiber_buffer: [@sizeOf(Fiber) + max_result_len]u8 align(@alignOf(Fiber)), main_fiber_buffer: [@sizeOf(Fiber) + max_result_len]u8 align(@alignOf(Fiber)),
@ -39,6 +40,7 @@ const Fiber = struct {
pub fn init(el: *EventLoop, gpa: Allocator) void { pub fn init(el: *EventLoop, gpa: Allocator) void {
el.* = .{ el.* = .{
.gpa = gpa, .gpa = gpa,
.mutex = .{},
.queue = .{}, .queue = .{},
.free = .{}, .free = .{},
.main_fiber_buffer = undefined, .main_fiber_buffer = undefined,
@ -48,7 +50,11 @@ pub fn init(el: *EventLoop, gpa: Allocator) void {
fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber { fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber {
assert(result_len <= max_result_len); assert(result_len <= max_result_len);
const free_node = el.free.pop() orelse { const free_node = free_node: {
el.mutex.lock();
defer el.mutex.unlock();
break :free_node el.free.pop();
} orelse {
const n = std.mem.alignForward( const n = std.mem.alignForward(
usize, usize,
@sizeOf(Fiber) + max_result_len + min_stack_size, @sizeOf(Fiber) + max_result_len + min_stack_size,
@ -59,36 +65,48 @@ fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber {
return @fieldParentPtr("queue_node", free_node); return @fieldParentPtr("queue_node", free_node);
} }
fn yield(el: *EventLoop, optional_fiber: ?*Fiber) void { fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) void {
if (optional_fiber) |fiber| { const message: SwitchMessage = .{
const old = &current_fiber.regs; .ready_fiber = optional_fiber orelse if (ready_node: {
current_fiber = fiber; el.mutex.lock();
contextSwitch(old, &fiber.regs); defer el.mutex.unlock();
return; break :ready_node el.queue.pop();
} }) |ready_node|
if (el.queue.pop()) |node| { @fieldParentPtr("queue_node", ready_node)
const fiber: *Fiber = @fieldParentPtr("queue_node", node); else if (register_awaiter) |_|
const old = &current_fiber.regs; @panic("no other fiber to switch to in order to be able to register this fiber as an awaiter") // time to switch to an idle fiber?
current_fiber = fiber; else
contextSwitch(old, &fiber.regs); return, // nothing to do
return; .register_awaiter = register_awaiter,
} };
@panic("everything is done"); std.log.debug("switching from {*} to {*}", .{ current_fiber, message.ready_fiber });
SwitchMessage.handle(@ptrFromInt(contextSwitch(&current_fiber.regs, &message.ready_fiber.regs, @intFromPtr(&message))), el);
} }
/// Equivalent to calling `yield` and then giving the fiber back to the event loop. const SwitchMessage = struct {
fn exit(el: *EventLoop, optional_fiber: ?*Fiber) noreturn { ready_fiber: *Fiber,
yield(el, optional_fiber); register_awaiter: ?*?*Fiber,
@panic("TODO recycle the fiber");
} fn handle(message: *const SwitchMessage, el: *EventLoop) void {
const prev_fiber = current_fiber;
current_fiber = message.ready_fiber;
if (message.register_awaiter) |awaiter| if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber);
}
};
fn schedule(el: *EventLoop, fiber: *Fiber) void { fn schedule(el: *EventLoop, fiber: *Fiber) void {
el.mutex.lock();
defer el.mutex.unlock();
el.queue.append(&fiber.queue_node); el.queue.append(&fiber.queue_node);
} }
fn myFiber(el: *EventLoop) *Fiber { fn recycle(el: *EventLoop, fiber: *Fiber) void {
_ = el; std.log.debug("recyling {*}", .{fiber});
return current_fiber; fiber.awaiter = undefined;
@memset(fiber.resultPointer()[0..max_result_len], undefined);
el.mutex.lock();
defer el.mutex.unlock();
el.free.append(&fiber.queue_node);
} }
const Regs = extern struct { const Regs = extern struct {
@ -101,7 +119,7 @@ const Regs = extern struct {
rbp: usize, rbp: usize,
}; };
const contextSwitch: *const fn (old: *Regs, new: *Regs) callconv(.c) void = @ptrCast(&contextSwitch_naked); const contextSwitch: *const fn (old: *Regs, new: *Regs, message: usize) callconv(.c) usize = @ptrCast(&contextSwitch_naked);
noinline fn contextSwitch_naked() callconv(.naked) void { noinline fn contextSwitch_naked() callconv(.naked) void {
asm volatile ( asm volatile (
@ -121,6 +139,7 @@ noinline fn contextSwitch_naked() callconv(.naked) void {
\\movq 0x28(%%rsi), %%rbx \\movq 0x28(%%rsi), %%rbx
\\movq 0x30(%%rsi), %%rbp \\movq 0x30(%%rsi), %%rbp
\\ \\
\\movq %%rdx, %%rax
\\ret \\ret
); );
} }
@ -128,6 +147,7 @@ noinline fn contextSwitch_naked() callconv(.naked) void {
fn popRet() callconv(.naked) void { fn popRet() callconv(.naked) void {
asm volatile ( asm volatile (
\\pop %%rdi \\pop %%rdi
\\movq %%rax, %%rsi
\\ret \\ret
); );
} }
@ -145,6 +165,7 @@ pub fn @"async"(
}; };
fiber.awaiter = null; fiber.awaiter = null;
fiber.queue_node = .{ .data = {} }; fiber.queue_node = .{ .data = {} };
std.log.debug("allocated {*}", .{fiber});
const closure: *AsyncClosure = @ptrFromInt(std.mem.alignBackward( const closure: *AsyncClosure = @ptrFromInt(std.mem.alignBackward(
usize, usize,
@ -157,14 +178,16 @@ pub fn @"async"(
.fiber = fiber, .fiber = fiber,
.start = start, .start = start,
}; };
const stack_end_ptr: [*]align(16) usize = @alignCast(@ptrCast(closure)); const stack_end: [*]align(16) usize = @alignCast(@ptrCast(closure));
(stack_end_ptr - 1)[0] = 0; const stack_top = (stack_end - 4)[0..4];
(stack_end_ptr - 2)[0] = @intFromPtr(&AsyncClosure.call); stack_top.* = .{
(stack_end_ptr - 3)[0] = @intFromPtr(closure); @intFromPtr(&popRet),
(stack_end_ptr - 4)[0] = @intFromPtr(&popRet); @intFromPtr(closure),
@intFromPtr(&AsyncClosure.call),
0,
};
fiber.regs = .{ fiber.regs = .{
.rsp = @intFromPtr(stack_end_ptr - 4), .rsp = @intFromPtr(stack_top),
.r15 = 0, .r15 = 0,
.r14 = 0, .r14 = 0,
.r13 = 0, .r13 = 0,
@ -181,30 +204,24 @@ const AsyncClosure = struct {
_: void align(16) = {}, _: void align(16) = {},
event_loop: *EventLoop, event_loop: *EventLoop,
context: ?*anyopaque, context: ?*anyopaque,
fiber: *EventLoop.Fiber, fiber: *Fiber,
start: *const fn (context: ?*anyopaque, result: *anyopaque) void, start: *const fn (context: ?*anyopaque, result: *anyopaque) void,
fn call(closure: *AsyncClosure) callconv(.c) void { fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.c) noreturn {
std.log.debug("wrap called in async", .{}); message.handle(closure.event_loop);
std.log.debug("{*} performing async", .{closure.fiber});
closure.start(closure.context, closure.fiber.resultPointer()); closure.start(closure.context, closure.fiber.resultPointer());
const awaiter = @atomicRmw(?*EventLoop.Fiber, &closure.fiber.awaiter, .Xchg, EventLoop.Fiber.finished, .seq_cst); const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
closure.event_loop.exit(awaiter); closure.event_loop.yield(awaiter, null);
unreachable; // switched to dead fiber
} }
}; };
pub fn @"await"(userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, result: []u8) void { pub fn @"await"(userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, result: []u8) void {
const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
const future_fiber: *EventLoop.Fiber = @alignCast(@ptrCast(any_future)); const future_fiber: *Fiber = @alignCast(@ptrCast(any_future));
const result_src = future_fiber.resultPointer()[0..result.len]; const result_src = future_fiber.resultPointer()[0..result.len];
const my_fiber = event_loop.myFiber(); if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, &future_fiber.awaiter);
const prev = @atomicRmw(?*EventLoop.Fiber, &future_fiber.awaiter, .Xchg, my_fiber, .seq_cst);
if (prev == EventLoop.Fiber.finished) {
@memcpy(result, result_src);
return;
}
event_loop.yield(prev);
// Resumed when the value is available.
std.log.debug("yield returned in await", .{});
@memcpy(result, result_src); @memcpy(result, result_src);
event_loop.recycle(future_fiber);
} }