From c8950b5dd549ffe2ead1979528e33c7a94540b8c Mon Sep 17 00:00:00 2001 From: Jacob Young Date: Tue, 1 Apr 2025 02:23:41 -0400 Subject: [PATCH] EventLoop: fix `std.Io.Condition` implementation 1. a fiber can't put itself on a queue that allows it to be rescheduled 2. allow the idle fiber to unlock a mutex held by another fiber by ignoring reschedule requests originating from the idle fiber --- lib/std/Io/EventLoop.zig | 39 +++++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index 11a05bef48..37ad088b6d 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -380,8 +380,7 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void { fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn { message.handle(el); - const thread: *Thread = &el.threads.allocated[0]; - el.idle(thread); + el.idle(&el.threads.allocated[0]); el.yield(@ptrCast(&el.main_fiber_buffer), .nothing); unreachable; // switched to dead fiber } @@ -480,10 +479,14 @@ const SwitchMessage = struct { reschedule, recycle: *Fiber, register_awaiter: *?*Fiber, - lock_mutex: struct { + mutex_lock: struct { prev_state: Io.Mutex.State, mutex: *Io.Mutex, }, + condition_wait: struct { + cond: *Io.Condition, + mutex: *Io.Mutex, + }, exit, }; @@ -492,7 +495,7 @@ const SwitchMessage = struct { thread.current_context = message.contexts.ready; switch (message.pending_task) { .nothing => {}, - .reschedule => { + .reschedule => if (message.contexts.prev != &thread.idle_context) { const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); assert(prev_fiber.queue_next == null); el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); @@ -511,16 +514,16 @@ const SwitchMessage = struct { .acq_rel, ) == Fiber.finished) el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); }, - .lock_mutex => |lock_mutex| { + .mutex_lock => |mutex_lock| { const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); assert(prev_fiber.queue_next == null); - var prev_state = lock_mutex.prev_state; + var prev_state = mutex_lock.prev_state; while (switch (prev_state) { else => next_state: { prev_fiber.queue_next = @ptrFromInt(@intFromEnum(prev_state)); break :next_state @cmpxchgWeak( Io.Mutex.State, - &lock_mutex.mutex.state, + &mutex_lock.mutex.state, prev_state, @enumFromInt(@intFromPtr(prev_fiber)), .release, @@ -529,7 +532,7 @@ const SwitchMessage = struct { }, .unlocked => @cmpxchgWeak( Io.Mutex.State, - &lock_mutex.mutex.state, + &mutex_lock.mutex.state, .unlocked, .locked_once, .acquire, @@ -541,6 +544,13 @@ const SwitchMessage = struct { }, }) |next_state| prev_state = next_state; }, + .condition_wait => |condition_wait| { + const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); + assert(prev_fiber.queue_next == null); + const cond_state: *?*Fiber = @ptrCast(&condition_wait.cond.state); + assert(@atomicRmw(?*Fiber, cond_state, .Xchg, prev_fiber, .release) == null); // More than one wait on same Condition is illegal. + condition_wait.mutex.unlock(el.io()); + }, .exit => for (el.threads.allocated[0..@atomicLoad(u32, &el.threads.active, .acquire)]) |*each_thread| { getSqe(&thread.io_uring).* = .{ .opcode = .MSG_RING, @@ -1242,7 +1252,7 @@ fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadl fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) error{Canceled}!void { const el: *EventLoop = @alignCast(@ptrCast(userdata)); - el.yield(null, .{ .lock_mutex = .{ + el.yield(null, .{ .mutex_lock = .{ .prev_state = prev_state, .mutex = mutex, } }); @@ -1271,13 +1281,10 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void { const el: *EventLoop = @alignCast(@ptrCast(userdata)); - const cond_state: *?*Fiber = @ptrCast(&cond.state); - const thread: *Thread = .current(); - const fiber = thread.currentFiber(); - const prev = @atomicRmw(?*Fiber, cond_state, .Xchg, fiber, .acquire); - assert(prev == null); // More than one wait on same Condition is illegal. - mutex.unlock(el.io()); - el.yield(null, .nothing); + el.yield(null, .{ .condition_wait = .{ + .cond = cond, + .mutex = mutex, + } }); try mutex.lock(el.io()); }