EventLoop: implement detached async

data races on deinit tho
This commit is contained in:
Andrew Kelley 2025-03-31 14:36:20 -07:00
parent f3553049cb
commit 5aa3d1425e
2 changed files with 133 additions and 40 deletions

View file

@ -979,7 +979,7 @@ pub const VTable = struct {
/// Thread-safe.
cancelRequested: *const fn (?*anyopaque) bool,
mutexLock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) error{Canceled}!void,
mutexLock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) Cancelable!void,
mutexUnlock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void,
conditionWait: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex, timeout_ns: ?u64) Condition.WaitError!void,
@ -998,11 +998,11 @@ pub const VTable = struct {
pub const OpenFlags = fs.File.OpenFlags;
pub const CreateFlags = fs.File.CreateFlags;
pub const FileOpenError = fs.File.OpenError || error{Canceled};
pub const FileReadError = fs.File.ReadError || error{Canceled};
pub const FilePReadError = fs.File.PReadError || error{Canceled};
pub const FileWriteError = fs.File.WriteError || error{Canceled};
pub const FilePWriteError = fs.File.PWriteError || error{Canceled};
pub const FileOpenError = fs.File.OpenError || Cancelable;
pub const FileReadError = fs.File.ReadError || Cancelable;
pub const FilePReadError = fs.File.PReadError || Cancelable;
pub const FileWriteError = fs.File.WriteError || Cancelable;
pub const FilePWriteError = fs.File.PWriteError || Cancelable;
pub const Timestamp = enum(i96) {
_,
@ -1019,7 +1019,7 @@ pub const Deadline = union(enum) {
nanoseconds: i96,
timestamp: Timestamp,
};
pub const ClockGetTimeError = std.posix.ClockGetTimeError || error{Canceled};
pub const ClockGetTimeError = std.posix.ClockGetTimeError || Cancelable;
pub const SleepError = error{ UnsupportedClock, Unexpected, Canceled };
pub const AnyFuture = opaque {};
@ -1087,7 +1087,7 @@ pub const Mutex = if (true) struct {
return prev_state.isUnlocked();
}
pub fn lock(mutex: *Mutex, io: std.Io) error{Canceled}!void {
pub fn lock(mutex: *Mutex, io: std.Io) Cancelable!void {
const prev_state: State = @enumFromInt(@atomicRmw(
usize,
@as(*usize, @ptrCast(&mutex.state)),
@ -1136,7 +1136,7 @@ pub const Mutex = if (true) struct {
}
/// Avoids the vtable for uncontended locks.
pub fn lock(m: *Mutex, io: Io) error{Canceled}!void {
pub fn lock(m: *Mutex, io: Io) Cancelable!void {
if (!m.tryLock()) {
@branchHint(.unlikely);
try io.vtable.mutexLock(io.userdata, {}, m);
@ -1162,10 +1162,10 @@ pub const Condition = struct {
all,
};
pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) void {
pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void {
io.vtable.conditionWait(io.userdata, cond, mutex, null) catch |err| switch (err) {
error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out
error.Canceled => return, // handled as spurious wakeup
error.Canceled => return error.Canceled,
};
}
@ -1182,6 +1182,11 @@ pub const Condition = struct {
}
};
pub const Cancelable = error{
/// Caller has requested the async operation to stop.
Canceled,
};
pub const TypeErasedQueue = struct {
mutex: Mutex,
@ -1205,7 +1210,7 @@ pub const TypeErasedQueue = struct {
pub fn init(buffer: []u8) TypeErasedQueue {
return .{
.mutex = .{},
.mutex = .init,
.buffer = buffer,
.put_index = 0,
.get_index = 0,
@ -1214,10 +1219,10 @@ pub const TypeErasedQueue = struct {
};
}
pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) usize {
pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) Cancelable!usize {
assert(elements.len >= min);
q.mutex.lock(io);
try q.mutex.lock(io);
defer q.mutex.unlock(io);
// Getters have first priority on the data, and only when the getters
@ -1264,15 +1269,15 @@ pub const TypeErasedQueue = struct {
.data = .{ .remaining = remaining, .condition = .{} },
};
q.putters.append(&node);
node.data.condition.wait(io, &q.mutex);
try node.data.condition.wait(io, &q.mutex);
remaining = node.data.remaining;
}
}
pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) usize {
pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) Cancelable!usize {
assert(buffer.len >= min);
q.mutex.lock(io);
try q.mutex.lock(io);
defer q.mutex.unlock(io);
// The ring buffer gets first priority, then data should come from any
@ -1329,7 +1334,7 @@ pub const TypeErasedQueue = struct {
.data = .{ .remaining = remaining, .condition = .{} },
};
q.getters.append(&node);
node.data.condition.wait(io, &q.mutex);
try node.data.condition.wait(io, &q.mutex);
remaining = node.data.remaining;
}
}
@ -1383,8 +1388,8 @@ pub fn Queue(Elem: type) type {
/// Returns how many elements have been added to the queue.
///
/// Asserts that `elements.len >= min`.
pub fn put(q: *@This(), io: Io, elements: []const Elem, min: usize) usize {
return @divExact(q.type_erased.put(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
pub fn put(q: *@This(), io: Io, elements: []const Elem, min: usize) Cancelable!usize {
return @divExact(try q.type_erased.put(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
}
/// Receives elements from the beginning of the queue. The function
@ -1394,17 +1399,17 @@ pub fn Queue(Elem: type) type {
/// Returns how many elements of `buffer` have been populated.
///
/// Asserts that `buffer.len >= min`.
pub fn get(q: *@This(), io: Io, buffer: []Elem, min: usize) usize {
return @divExact(q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
pub fn get(q: *@This(), io: Io, buffer: []Elem, min: usize) Cancelable!usize {
return @divExact(try q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
}
pub fn putOne(q: *@This(), io: Io, item: Elem) void {
assert(q.put(io, &.{item}, 1) == 1);
pub fn putOne(q: *@This(), io: Io, item: Elem) Cancelable!void {
assert(try q.put(io, &.{item}, 1) == 1);
}
pub fn getOne(q: *@This(), io: Io) Elem {
pub fn getOne(q: *@This(), io: Io) Cancelable!Elem {
var buf: [1]Elem = undefined;
assert(q.get(io, &buf, 1) == 1);
assert(try q.get(io, &buf, 1) == 1);
return buf[0];
}
};

View file

@ -27,6 +27,7 @@ const Thread = struct {
current_context: *Context,
ready_queue: ?*Fiber,
free_queue: ?*Fiber,
detached_queue: ?*Fiber,
io_uring: IoUring,
idle_search_index: u32,
steal_ready_search_index: u32,
@ -208,6 +209,7 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
.current_context = &main_fiber.context,
.ready_queue = null,
.free_queue = null,
.detached_queue = null,
.io_uring = try IoUring.init(io_uring_entries, 0),
.idle_search_index = 1,
.steal_ready_search_index = 1,
@ -218,7 +220,16 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
}
pub fn deinit(el: *EventLoop) void {
// Wait for detached fibers.
const active_threads = @atomicLoad(u32, &el.threads.active, .acquire);
for (el.threads.allocated[0..active_threads]) |*thread| {
while (thread.detached_queue) |detached_fiber| {
if (@atomicLoad(?*Fiber, &detached_fiber.awaiter, .acquire) != Fiber.finished)
el.yield(null, .{ .register_awaiter = &detached_fiber.awaiter });
detached_fiber.recycle();
}
}
for (el.threads.allocated[0..active_threads]) |*thread| {
const ready_fiber = @atomicLoad(?*Fiber, &thread.ready_queue, .monotonic);
assert(ready_fiber == null or ready_fiber == Fiber.finished); // pending async
@ -336,6 +347,7 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void {
.current_context = &new_thread.idle_context,
.ready_queue = ready_queue.head,
.free_queue = null,
.detached_queue = null,
.io_uring = IoUring.init(io_uring_entries, 0) catch |err| {
@atomicStore(u32, &el.threads.reserved, new_thread_index, .release);
// no more access to `thread` after giving up reservation
@ -470,6 +482,7 @@ const SwitchMessage = struct {
const PendingTask = union(enum) {
nothing,
reschedule,
recycle: *Fiber,
register_awaiter: *?*Fiber,
lock_mutex: struct {
prev_state: Io.Mutex.State,
@ -488,6 +501,9 @@ const SwitchMessage = struct {
assert(prev_fiber.queue_next == null);
el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
},
.recycle => |fiber| {
fiber.recycle();
},
.register_awaiter => |awaiter| {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
assert(prev_fiber.queue_next == null);
@ -612,6 +628,18 @@ fn fiberEntry() callconv(.naked) void {
}
}
fn fiberEntryDetached() callconv(.naked) void {
switch (builtin.cpu.arch) {
.x86_64 => asm volatile (
\\ leaq 8(%%rsp), %%rdi
\\ jmp %[DetachedClosure_call:P]
:
: [DetachedClosure_call] "X" (&DetachedClosure.call),
),
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
}
}
const AsyncClosure = struct {
event_loop: *EventLoop,
fiber: *Fiber,
@ -632,6 +660,31 @@ const AsyncClosure = struct {
}
};
const DetachedClosure = struct {
event_loop: *EventLoop,
fiber: *Fiber,
start: *const fn (context: *const anyopaque) void,
fn contextPointer(closure: *DetachedClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(DetachedClosure));
}
fn call(closure: *DetachedClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(DetachedClosure))) noreturn {
message.handle(closure.event_loop);
std.log.debug("{*} performing async detached", .{closure.fiber});
closure.start(closure.contextPointer());
const current_thread: *Thread = .current();
current_thread.detached_queue = closure.fiber.queue_next;
const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
if (awaiter) |a| {
closure.event_loop.yield(a, .nothing);
} else {
closure.event_loop.yield(null, .{ .recycle = closure.fiber });
}
unreachable; // switched to dead fiber
}
};
fn @"async"(
userdata: ?*anyopaque,
result: []u8,
@ -682,6 +735,53 @@ fn @"async"(
return @ptrCast(fiber);
}
fn go(
userdata: ?*anyopaque,
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque) void,
) void {
assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
assert(context.len <= Fiber.max_context_size); // TODO
const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
const fiber = Fiber.allocate(event_loop) catch {
start(context.ptr);
return;
};
std.log.debug("allocated {*}", .{fiber});
const current_thread: *Thread = .current();
const closure: *DetachedClosure = @ptrFromInt(Fiber.max_context_align.max(.of(DetachedClosure)).backward(
@intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size,
) - @sizeOf(DetachedClosure));
fiber.* = .{
.required_align = {},
.context = switch (builtin.cpu.arch) {
.x86_64 => .{
.rsp = @intFromPtr(closure) - @sizeOf(usize),
.rbp = 0,
.rip = @intFromPtr(&fiberEntryDetached),
},
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
},
.awaiter = null,
.queue_next = current_thread.detached_queue,
.cancel_thread = null,
.awaiting_completions = .initEmpty(),
};
current_thread.detached_queue = fiber;
closure.* = .{
.event_loop = event_loop,
.fiber = fiber,
.start = start,
};
@memcpy(closure.contextPointer(), context);
event_loop.schedule(current_thread, .{ .head = fiber, .tail = fiber });
}
fn @"await"(
userdata: ?*anyopaque,
any_future: *std.Io.AnyFuture,
@ -690,24 +790,12 @@ fn @"await"(
) void {
const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
const future_fiber: *Fiber = @alignCast(@ptrCast(any_future));
if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished)
event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
@memcpy(result, future_fiber.resultBytes(result_alignment));
future_fiber.recycle();
}
fn go(
userdata: ?*anyopaque,
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque) void,
) void {
_ = userdata;
_ = context;
_ = context_alignment;
_ = start;
@panic("TODO");
}
fn cancel(
userdata: ?*anyopaque,
any_future: *std.Io.AnyFuture,