From f2b24479eed677677cc7615fa120d0d0cb87b389 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 31 Mar 2025 23:51:51 -0700 Subject: [PATCH] EventLoop: let the allocator do its job to bucket and free fiber allocations --- lib/std/Io/EventLoop.zig | 33 ++++++++------------------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index 37ad088b6d..067ed04c3d 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -26,7 +26,6 @@ const Thread = struct { idle_context: Context, current_context: *Context, ready_queue: ?*Fiber, - free_queue: ?*Fiber, io_uring: IoUring, idle_search_index: u32, steal_ready_search_index: u32, @@ -78,12 +77,6 @@ const Fiber = struct { ); fn allocate(el: *EventLoop) error{OutOfMemory}!*Fiber { - const thread: *Thread = .current(); - if (thread.free_queue) |free_fiber| { - thread.free_queue = free_fiber.queue_next; - free_fiber.queue_next = null; - return free_fiber; - } return @ptrCast(try el.gpa.alignedAlloc(u8, @alignOf(Fiber), allocation_size)); } @@ -129,18 +122,15 @@ const Fiber = struct { )) |cancel_thread| assert(cancel_thread == Thread.canceling); } - fn recycle(fiber: *Fiber) void { - const thread: *Thread = .current(); - std.log.debug("recyling {*}", .{fiber}); - assert(fiber.queue_next == null); - //@memset(fiber.allocatedSlice(), undefined); // (race) - fiber.queue_next = thread.free_queue; - thread.free_queue = fiber; - } - const Queue = struct { head: *Fiber, tail: *Fiber }; }; +fn recycle(el: *EventLoop, fiber: *Fiber) void { + std.log.debug("recyling {*}", .{fiber}); + assert(fiber.queue_next == null); + el.gpa.free(fiber.allocatedSlice()); +} + pub fn io(el: *EventLoop) Io { return .{ .userdata = el, @@ -207,7 +197,6 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void { }, .current_context = &main_fiber.context, .ready_queue = null, - .free_queue = null, .io_uring = try IoUring.init(io_uring_entries, 0), .idle_search_index = 1, .steal_ready_search_index = 1, @@ -227,11 +216,6 @@ pub fn deinit(el: *EventLoop) void { const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @alignCast(@ptrCast(el.threads.allocated.ptr)); const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max); for (el.threads.allocated[1..active_threads]) |*thread| thread.thread.join(); - for (el.threads.allocated[0..active_threads]) |*thread| while (thread.free_queue) |free_fiber| { - thread.free_queue = free_fiber.queue_next; - free_fiber.queue_next = null; - el.gpa.free(free_fiber.allocatedSlice()); - }; el.gpa.free(allocated_ptr[0..idle_stack_end_offset]); el.* = undefined; } @@ -343,7 +327,6 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void { .idle_context = undefined, .current_context = &new_thread.idle_context, .ready_queue = ready_queue.head, - .free_queue = null, .io_uring = IoUring.init(io_uring_entries, 0) catch |err| { @atomicStore(u32, &el.threads.reserved, new_thread_index, .release); // no more access to `thread` after giving up reservation @@ -501,7 +484,7 @@ const SwitchMessage = struct { el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); }, .recycle => |fiber| { - fiber.recycle(); + el.recycle(fiber); }, .register_awaiter => |awaiter| { const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); @@ -795,7 +778,7 @@ fn @"await"( if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter }); @memcpy(result, future_fiber.resultBytes(result_alignment)); - future_fiber.recycle(); + event_loop.recycle(future_fiber); } fn cancel(