EventLoop: let the allocator do its job

to bucket and free fiber allocations
This commit is contained in:
Andrew Kelley 2025-03-31 23:51:51 -07:00
parent 9a652cc42a
commit f2b24479ee

View file

@ -26,7 +26,6 @@ const Thread = struct {
idle_context: Context,
current_context: *Context,
ready_queue: ?*Fiber,
free_queue: ?*Fiber,
io_uring: IoUring,
idle_search_index: u32,
steal_ready_search_index: u32,
@ -78,12 +77,6 @@ const Fiber = struct {
);
fn allocate(el: *EventLoop) error{OutOfMemory}!*Fiber {
const thread: *Thread = .current();
if (thread.free_queue) |free_fiber| {
thread.free_queue = free_fiber.queue_next;
free_fiber.queue_next = null;
return free_fiber;
}
return @ptrCast(try el.gpa.alignedAlloc(u8, @alignOf(Fiber), allocation_size));
}
@ -129,18 +122,15 @@ const Fiber = struct {
)) |cancel_thread| assert(cancel_thread == Thread.canceling);
}
fn recycle(fiber: *Fiber) void {
const thread: *Thread = .current();
std.log.debug("recyling {*}", .{fiber});
assert(fiber.queue_next == null);
//@memset(fiber.allocatedSlice(), undefined); // (race)
fiber.queue_next = thread.free_queue;
thread.free_queue = fiber;
}
const Queue = struct { head: *Fiber, tail: *Fiber };
};
fn recycle(el: *EventLoop, fiber: *Fiber) void {
std.log.debug("recyling {*}", .{fiber});
assert(fiber.queue_next == null);
el.gpa.free(fiber.allocatedSlice());
}
pub fn io(el: *EventLoop) Io {
return .{
.userdata = el,
@ -207,7 +197,6 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
},
.current_context = &main_fiber.context,
.ready_queue = null,
.free_queue = null,
.io_uring = try IoUring.init(io_uring_entries, 0),
.idle_search_index = 1,
.steal_ready_search_index = 1,
@ -227,11 +216,6 @@ pub fn deinit(el: *EventLoop) void {
const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @alignCast(@ptrCast(el.threads.allocated.ptr));
const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max);
for (el.threads.allocated[1..active_threads]) |*thread| thread.thread.join();
for (el.threads.allocated[0..active_threads]) |*thread| while (thread.free_queue) |free_fiber| {
thread.free_queue = free_fiber.queue_next;
free_fiber.queue_next = null;
el.gpa.free(free_fiber.allocatedSlice());
};
el.gpa.free(allocated_ptr[0..idle_stack_end_offset]);
el.* = undefined;
}
@ -343,7 +327,6 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void {
.idle_context = undefined,
.current_context = &new_thread.idle_context,
.ready_queue = ready_queue.head,
.free_queue = null,
.io_uring = IoUring.init(io_uring_entries, 0) catch |err| {
@atomicStore(u32, &el.threads.reserved, new_thread_index, .release);
// no more access to `thread` after giving up reservation
@ -501,7 +484,7 @@ const SwitchMessage = struct {
el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
},
.recycle => |fiber| {
fiber.recycle();
el.recycle(fiber);
},
.register_awaiter => |awaiter| {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
@ -795,7 +778,7 @@ fn @"await"(
if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished)
event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
@memcpy(result, future_fiber.resultBytes(result_alignment));
future_fiber.recycle();
event_loop.recycle(future_fiber);
}
fn cancel(