EventLoop: fix std.Io.Condition implementation

1. a fiber can't put itself on a queue that allows it to be rescheduled
 2. allow the idle fiber to unlock a mutex held by another fiber by
    ignoring reschedule requests originating from the idle fiber
This commit is contained in:
Jacob Young 2025-04-01 02:23:41 -04:00 committed by Andrew Kelley
parent 5952fc2c73
commit c8950b5dd5

View file

@ -380,8 +380,7 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void {
fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn { fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn {
message.handle(el); message.handle(el);
const thread: *Thread = &el.threads.allocated[0]; el.idle(&el.threads.allocated[0]);
el.idle(thread);
el.yield(@ptrCast(&el.main_fiber_buffer), .nothing); el.yield(@ptrCast(&el.main_fiber_buffer), .nothing);
unreachable; // switched to dead fiber unreachable; // switched to dead fiber
} }
@ -480,10 +479,14 @@ const SwitchMessage = struct {
reschedule, reschedule,
recycle: *Fiber, recycle: *Fiber,
register_awaiter: *?*Fiber, register_awaiter: *?*Fiber,
lock_mutex: struct { mutex_lock: struct {
prev_state: Io.Mutex.State, prev_state: Io.Mutex.State,
mutex: *Io.Mutex, mutex: *Io.Mutex,
}, },
condition_wait: struct {
cond: *Io.Condition,
mutex: *Io.Mutex,
},
exit, exit,
}; };
@ -492,7 +495,7 @@ const SwitchMessage = struct {
thread.current_context = message.contexts.ready; thread.current_context = message.contexts.ready;
switch (message.pending_task) { switch (message.pending_task) {
.nothing => {}, .nothing => {},
.reschedule => { .reschedule => if (message.contexts.prev != &thread.idle_context) {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
assert(prev_fiber.queue_next == null); assert(prev_fiber.queue_next == null);
el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
@ -511,16 +514,16 @@ const SwitchMessage = struct {
.acq_rel, .acq_rel,
) == Fiber.finished) el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); ) == Fiber.finished) el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
}, },
.lock_mutex => |lock_mutex| { .mutex_lock => |mutex_lock| {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
assert(prev_fiber.queue_next == null); assert(prev_fiber.queue_next == null);
var prev_state = lock_mutex.prev_state; var prev_state = mutex_lock.prev_state;
while (switch (prev_state) { while (switch (prev_state) {
else => next_state: { else => next_state: {
prev_fiber.queue_next = @ptrFromInt(@intFromEnum(prev_state)); prev_fiber.queue_next = @ptrFromInt(@intFromEnum(prev_state));
break :next_state @cmpxchgWeak( break :next_state @cmpxchgWeak(
Io.Mutex.State, Io.Mutex.State,
&lock_mutex.mutex.state, &mutex_lock.mutex.state,
prev_state, prev_state,
@enumFromInt(@intFromPtr(prev_fiber)), @enumFromInt(@intFromPtr(prev_fiber)),
.release, .release,
@ -529,7 +532,7 @@ const SwitchMessage = struct {
}, },
.unlocked => @cmpxchgWeak( .unlocked => @cmpxchgWeak(
Io.Mutex.State, Io.Mutex.State,
&lock_mutex.mutex.state, &mutex_lock.mutex.state,
.unlocked, .unlocked,
.locked_once, .locked_once,
.acquire, .acquire,
@ -541,6 +544,13 @@ const SwitchMessage = struct {
}, },
}) |next_state| prev_state = next_state; }) |next_state| prev_state = next_state;
}, },
.condition_wait => |condition_wait| {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
assert(prev_fiber.queue_next == null);
const cond_state: *?*Fiber = @ptrCast(&condition_wait.cond.state);
assert(@atomicRmw(?*Fiber, cond_state, .Xchg, prev_fiber, .release) == null); // More than one wait on same Condition is illegal.
condition_wait.mutex.unlock(el.io());
},
.exit => for (el.threads.allocated[0..@atomicLoad(u32, &el.threads.active, .acquire)]) |*each_thread| { .exit => for (el.threads.allocated[0..@atomicLoad(u32, &el.threads.active, .acquire)]) |*each_thread| {
getSqe(&thread.io_uring).* = .{ getSqe(&thread.io_uring).* = .{
.opcode = .MSG_RING, .opcode = .MSG_RING,
@ -1242,7 +1252,7 @@ fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadl
fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) error{Canceled}!void { fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) error{Canceled}!void {
const el: *EventLoop = @alignCast(@ptrCast(userdata)); const el: *EventLoop = @alignCast(@ptrCast(userdata));
el.yield(null, .{ .lock_mutex = .{ el.yield(null, .{ .mutex_lock = .{
.prev_state = prev_state, .prev_state = prev_state,
.mutex = mutex, .mutex = mutex,
} }); } });
@ -1271,13 +1281,10 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut
fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void { fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
const el: *EventLoop = @alignCast(@ptrCast(userdata)); const el: *EventLoop = @alignCast(@ptrCast(userdata));
const cond_state: *?*Fiber = @ptrCast(&cond.state); el.yield(null, .{ .condition_wait = .{
const thread: *Thread = .current(); .cond = cond,
const fiber = thread.currentFiber(); .mutex = mutex,
const prev = @atomicRmw(?*Fiber, cond_state, .Xchg, fiber, .acquire); } });
assert(prev == null); // More than one wait on same Condition is illegal.
mutex.unlock(el.io());
el.yield(null, .nothing);
try mutex.lock(el.io()); try mutex.lock(el.io());
} }