From 9fd1ecb348f25dd56f09a6bd10022554a87bb6da Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Thu, 17 Jul 2025 20:26:07 -0700 Subject: [PATCH] std.Io: add asyncConcurrent and asyncParallel --- lib/std/Io.zig | 121 +++++++++- lib/std/Io/EventLoop.zig | 44 +++- lib/std/Io/ThreadPool.zig | 474 ++++++++++++++------------------------ lib/std/Thread.zig | 2 + 4 files changed, 331 insertions(+), 310 deletions(-) diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 85644004ff..df95da23cf 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -581,6 +581,32 @@ pub const VTable = struct { context_alignment: std.mem.Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*AnyFuture, + /// Returning `null` indicates resource allocation failed. + /// + /// Thread-safe. + asyncConcurrent: *const fn ( + /// Corresponds to `Io.userdata`. + userdata: ?*anyopaque, + result_len: usize, + result_alignment: std.mem.Alignment, + /// Copied and then passed to `start`. + context: []const u8, + context_alignment: std.mem.Alignment, + start: *const fn (context: *const anyopaque, result: *anyopaque) void, + ) ?*AnyFuture, + /// Returning `null` indicates resource allocation failed. + /// + /// Thread-safe. + asyncParallel: *const fn ( + /// Corresponds to `Io.userdata`. + userdata: ?*anyopaque, + result_len: usize, + result_alignment: std.mem.Alignment, + /// Copied and then passed to `start`. + context: []const u8, + context_alignment: std.mem.Alignment, + start: *const fn (context: *const anyopaque, result: *anyopaque) void, + ) ?*AnyFuture, /// Executes `start` asynchronously in a manner such that it cleans itself /// up. This mode does not support results, await, or cancel. /// @@ -1138,7 +1164,18 @@ pub fn Queue(Elem: type) type { /// Calls `function` with `args`, such that the return value of the function is /// not guaranteed to be available until `await` is called. -pub fn async(io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) { +/// +/// `function` *may* be called immediately, before `async` returns. This has +/// weaker guarantees than `asyncConcurrent` and `asyncParallel`, making it the +/// most portable and reusable among the async family functions. +/// +/// See also: +/// * `asyncDetached` +pub fn async( + io: Io, + function: anytype, + args: std.meta.ArgsTuple(@TypeOf(function)), +) Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) { const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?; const Args = @TypeOf(args); const TypeErased = struct { @@ -1160,8 +1197,86 @@ pub fn async(io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(functio return future; } +/// Calls `function` with `args`, such that the return value of the function is +/// not guaranteed to be available until `await` is called, passing control +/// flow back to the caller while waiting for any `Io` operations. +/// +/// This has a weaker guarantee than `asyncParallel`, making it more portable +/// and reusable, however it has stronger guarantee than `async`, placing +/// restrictions on what kind of `Io` implementations are supported. By calling +/// `async` instead, one allows, for example, stackful single-threaded blocking I/O. +pub fn asyncConcurrent( + io: Io, + function: anytype, + args: std.meta.ArgsTuple(@TypeOf(function)), +) error{OutOfMemory}!Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) { + const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?; + const Args = @TypeOf(args); + const TypeErased = struct { + fn start(context: *const anyopaque, result: *anyopaque) void { + const args_casted: *const Args = @alignCast(@ptrCast(context)); + const result_casted: *Result = @ptrCast(@alignCast(result)); + result_casted.* = @call(.auto, function, args_casted.*); + } + }; + var future: Future(Result) = undefined; + future.any_future = io.vtable.asyncConcurrent( + io.userdata, + @sizeOf(Result), + .of(Result), + @ptrCast((&args)[0..1]), + .of(Args), + TypeErased.start, + ); + return future; +} + +/// Calls `function` with `args`, such that the return value of the function is +/// not guaranteed to be available until `await` is called, while simultaneously +/// passing control flow back to the caller. +/// +/// This has the strongest guarantees of all async family functions, placing +/// the most restrictions on what kind of `Io` implementations are supported. +/// By calling `asyncConcurrent` instead, one allows, for example, +/// stackful single-threaded non-blocking I/O. +/// +/// See also: +/// * `asyncConcurrent` +/// * `async` +pub fn asyncParallel( + io: Io, + function: anytype, + args: std.meta.ArgsTuple(@TypeOf(function)), +) error{OutOfMemory}!Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) { + const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?; + const Args = @TypeOf(args); + const TypeErased = struct { + fn start(context: *const anyopaque, result: *anyopaque) void { + const args_casted: *const Args = @alignCast(@ptrCast(context)); + const result_casted: *Result = @ptrCast(@alignCast(result)); + result_casted.* = @call(.auto, function, args_casted.*); + } + }; + var future: Future(Result) = undefined; + future.any_future = io.vtable.asyncConcurrent( + io.userdata, + @ptrCast((&future.result)[0..1]), + .of(Result), + @ptrCast((&args)[0..1]), + .of(Args), + TypeErased.start, + ); + return future; +} + /// Calls `function` with `args` asynchronously. The resource cleans itself up /// when the function returns. Does not support await, cancel, or a return value. +/// +/// `function` *may* be called immediately, before `async` returns. +/// +/// See also: +/// * `async` +/// * `asyncConcurrent` pub fn asyncDetached(io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void { const Args = @TypeOf(args); const TypeErased = struct { @@ -1173,6 +1288,10 @@ pub fn asyncDetached(io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf io.vtable.asyncDetached(io.userdata, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start); } +pub fn cancelRequested(io: Io) bool { + return io.vtable.cancelRequested(io.userdata); +} + pub fn now(io: Io, clockid: std.posix.clockid_t) ClockGetTimeError!Timestamp { return io.vtable.now(io.userdata, clockid); } diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index c30736a885..3f070362e5 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -139,6 +139,8 @@ pub fn io(el: *EventLoop) Io { .userdata = el, .vtable = &.{ .async = async, + .asyncConcurrent = asyncConcurrent, + .asyncParallel = asyncParallel, .await = await, .asyncDetached = asyncDetached, .select = select, @@ -877,16 +879,27 @@ fn async( context_alignment: Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*std.Io.AnyFuture { - assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO - assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO - assert(result.len <= Fiber.max_result_size); // TODO - assert(context.len <= Fiber.max_context_size); // TODO - - const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); - const fiber = Fiber.allocate(event_loop) catch { + return asyncConcurrent(userdata, result.len, result_alignment, context, context_alignment, start) orelse { start(context.ptr, result.ptr); return null; }; +} + +fn asyncConcurrent( + userdata: ?*anyopaque, + result_len: usize, + result_alignment: Alignment, + context: []const u8, + context_alignment: Alignment, + start: *const fn (context: *const anyopaque, result: *anyopaque) void, +) ?*std.Io.AnyFuture { + assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO + assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO + assert(result_len <= Fiber.max_result_size); // TODO + assert(context.len <= Fiber.max_context_size); // TODO + + const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); + const fiber = Fiber.allocate(event_loop) catch return null; std.log.debug("allocated {*}", .{fiber}); const closure: *AsyncClosure = .fromFiber(fiber); @@ -925,6 +938,23 @@ fn async( return @ptrCast(fiber); } +fn asyncParallel( + userdata: ?*anyopaque, + result_len: usize, + result_alignment: Alignment, + context: []const u8, + context_alignment: Alignment, + start: *const fn (context: *const anyopaque, result: *anyopaque) void, +) ?*std.Io.AnyFuture { + _ = userdata; + _ = result_len; + _ = result_alignment; + _ = context; + _ = context_alignment; + _ = start; + @panic("TODO"); +} + const DetachedClosure = struct { event_loop: *EventLoop, fiber: *Fiber, diff --git a/lib/std/Io/ThreadPool.zig b/lib/std/Io/ThreadPool.zig index f356187748..c5fc45ad91 100644 --- a/lib/std/Io/ThreadPool.zig +++ b/lib/std/Io/ThreadPool.zig @@ -6,331 +6,88 @@ const WaitGroup = std.Thread.WaitGroup; const Io = std.Io; const Pool = @This(); -/// Must be a thread-safe allocator. -allocator: std.mem.Allocator, +/// Thread-safe. +allocator: Allocator, mutex: std.Thread.Mutex = .{}, cond: std.Thread.Condition = .{}, run_queue: std.SinglyLinkedList = .{}, -is_running: bool = true, +join_requested: bool = false, threads: std.ArrayListUnmanaged(std.Thread), -ids: if (builtin.single_threaded) struct { - inline fn deinit(_: @This(), _: std.mem.Allocator) void {} - fn getIndex(_: @This(), _: std.Thread.Id) usize { - return 0; - } -} else std.AutoArrayHashMapUnmanaged(std.Thread.Id, void), stack_size: usize, +cpu_count: std.Thread.CpuCountError!usize, +parallel_count: usize, threadlocal var current_closure: ?*AsyncClosure = null; pub const Runnable = struct { - runFn: RunProto, + start: Start, node: std.SinglyLinkedList.Node = .{}, + is_parallel: bool, + + pub const Start = *const fn (*Runnable) void; }; -pub const RunProto = *const fn (*Runnable, id: ?usize) void; +pub const InitError = std.Thread.CpuCountError || Allocator.Error; -pub const Options = struct { - allocator: std.mem.Allocator, - n_jobs: ?usize = null, - track_ids: bool = false, - stack_size: usize = std.Thread.SpawnConfig.default_stack_size, -}; - -pub fn init(pool: *Pool, options: Options) !void { - const gpa = options.allocator; - const thread_count = options.n_jobs orelse @max(1, std.Thread.getCpuCount() catch 1); - const threads = try gpa.alloc(std.Thread, thread_count); - errdefer gpa.free(threads); - - pool.* = .{ +pub fn init(gpa: Allocator) Pool { + var pool: Pool = .{ .allocator = gpa, - .threads = .initBuffer(threads), - .ids = .{}, - .stack_size = options.stack_size, + .threads = .empty, + .stack_size = std.Thread.SpawnConfig.default_stack_size, + .cpu_count = std.Thread.getCpuCount(), + .parallel_count = 0, }; - - if (builtin.single_threaded) return; - - if (options.track_ids) { - try pool.ids.ensureTotalCapacity(gpa, 1 + thread_count); - pool.ids.putAssumeCapacityNoClobber(std.Thread.getCurrentId(), {}); - } + if (pool.cpu_count) |n| { + pool.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {}; + } else |_| {} + return pool; } pub fn deinit(pool: *Pool) void { const gpa = pool.allocator; pool.join(); pool.threads.deinit(gpa); - pool.ids.deinit(gpa); pool.* = undefined; } fn join(pool: *Pool) void { if (builtin.single_threaded) return; - { pool.mutex.lock(); defer pool.mutex.unlock(); - - // ensure future worker threads exit the dequeue loop - pool.is_running = false; + pool.join_requested = true; } - - // wake up any sleeping threads (this can be done outside the mutex) - // then wait for all the threads we know are spawned to complete. pool.cond.broadcast(); for (pool.threads.items) |thread| thread.join(); } -/// Runs `func` in the thread pool, calling `WaitGroup.start` beforehand, and -/// `WaitGroup.finish` after it returns. -/// -/// In the case that queuing the function call fails to allocate memory, or the -/// target is single-threaded, the function is called directly. -pub fn spawnWg(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, args: anytype) void { - wait_group.start(); - - if (builtin.single_threaded) { - @call(.auto, func, args); - wait_group.finish(); - return; - } - - const Args = @TypeOf(args); - const Closure = struct { - arguments: Args, - pool: *Pool, - runnable: Runnable = .{ .runFn = runFn }, - wait_group: *WaitGroup, - - fn runFn(runnable: *Runnable, _: ?usize) void { - const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable)); - @call(.auto, func, closure.arguments); - closure.wait_group.finish(); - closure.pool.allocator.destroy(closure); - } - }; - - pool.mutex.lock(); - - const gpa = pool.allocator; - const closure = gpa.create(Closure) catch { - pool.mutex.unlock(); - @call(.auto, func, args); - wait_group.finish(); - return; - }; - closure.* = .{ - .arguments = args, - .pool = pool, - .wait_group = wait_group, - }; - - pool.run_queue.prepend(&closure.runnable.node); - - if (pool.threads.items.len < pool.threads.capacity) { - pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{ - .stack_size = pool.stack_size, - .allocator = gpa, - }, worker, .{pool}) catch t: { - pool.threads.items.len -= 1; - break :t undefined; - }; - } - - pool.mutex.unlock(); - pool.cond.signal(); -} - -/// Runs `func` in the thread pool, calling `WaitGroup.start` beforehand, and -/// `WaitGroup.finish` after it returns. -/// -/// The first argument passed to `func` is a dense `usize` thread id, the rest -/// of the arguments are passed from `args`. Requires the pool to have been -/// initialized with `.track_ids = true`. -/// -/// In the case that queuing the function call fails to allocate memory, or the -/// target is single-threaded, the function is called directly. -pub fn spawnWgId(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, args: anytype) void { - wait_group.start(); - - if (builtin.single_threaded) { - @call(.auto, func, .{0} ++ args); - wait_group.finish(); - return; - } - - const Args = @TypeOf(args); - const Closure = struct { - arguments: Args, - pool: *Pool, - runnable: Runnable = .{ .runFn = runFn }, - wait_group: *WaitGroup, - - fn runFn(runnable: *Runnable, id: ?usize) void { - const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable)); - @call(.auto, func, .{id.?} ++ closure.arguments); - closure.wait_group.finish(); - closure.pool.allocator.destroy(closure); - } - }; - - pool.mutex.lock(); - - const gpa = pool.allocator; - const closure = gpa.create(Closure) catch { - const id: ?usize = pool.ids.getIndex(std.Thread.getCurrentId()); - pool.mutex.unlock(); - @call(.auto, func, .{id.?} ++ args); - wait_group.finish(); - return; - }; - closure.* = .{ - .arguments = args, - .pool = pool, - .wait_group = wait_group, - }; - - pool.run_queue.prepend(&closure.runnable.node); - - if (pool.threads.items.len < pool.threads.capacity) { - pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{ - .stack_size = pool.stack_size, - .allocator = gpa, - }, worker, .{pool}) catch t: { - pool.threads.items.len -= 1; - break :t undefined; - }; - } - - pool.mutex.unlock(); - pool.cond.signal(); -} - -pub fn spawn(pool: *Pool, comptime func: anytype, args: anytype) void { - if (builtin.single_threaded) { - @call(.auto, func, args); - return; - } - - const Args = @TypeOf(args); - const Closure = struct { - arguments: Args, - pool: *Pool, - runnable: Runnable = .{ .runFn = runFn }, - - fn runFn(runnable: *Runnable, _: ?usize) void { - const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable)); - @call(.auto, func, closure.arguments); - closure.pool.allocator.destroy(closure); - } - }; - - pool.mutex.lock(); - - const gpa = pool.allocator; - const closure = gpa.create(Closure) catch { - pool.mutex.unlock(); - @call(.auto, func, args); - return; - }; - closure.* = .{ - .arguments = args, - .pool = pool, - }; - - pool.run_queue.prepend(&closure.runnable.node); - - if (pool.threads.items.len < pool.threads.capacity) { - pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{ - .stack_size = pool.stack_size, - .allocator = gpa, - }, worker, .{pool}) catch t: { - pool.threads.items.len -= 1; - break :t undefined; - }; - } - - pool.mutex.unlock(); - pool.cond.signal(); -} - -test spawn { - const TestFn = struct { - fn checkRun(completed: *bool) void { - completed.* = true; - } - }; - - var completed: bool = false; - - { - var pool: Pool = undefined; - try pool.init(.{ - .allocator = std.testing.allocator, - }); - defer pool.deinit(); - pool.spawn(TestFn.checkRun, .{&completed}); - } - - try std.testing.expectEqual(true, completed); -} - fn worker(pool: *Pool) void { pool.mutex.lock(); defer pool.mutex.unlock(); - const id: ?usize = if (pool.ids.count() > 0) @intCast(pool.ids.count()) else null; - if (id) |_| pool.ids.putAssumeCapacityNoClobber(std.Thread.getCurrentId(), {}); - while (true) { while (pool.run_queue.popFirst()) |run_node| { - // Temporarily unlock the mutex in order to execute the run_node - pool.mutex.unlock(); - defer pool.mutex.lock(); - - const runnable: *Runnable = @fieldParentPtr("node", run_node); - runnable.runFn(runnable, id); - } - - // Stop executing instead of waiting if the thread pool is no longer running. - if (pool.is_running) { - pool.cond.wait(&pool.mutex); - } else { - break; - } - } -} - -pub fn waitAndWork(pool: *Pool, wait_group: *WaitGroup) void { - var id: ?usize = null; - - while (!wait_group.isDone()) { - pool.mutex.lock(); - if (pool.run_queue.popFirst()) |run_node| { - id = id orelse pool.ids.getIndex(std.Thread.getCurrentId()); pool.mutex.unlock(); const runnable: *Runnable = @fieldParentPtr("node", run_node); - runnable.runFn(runnable, id); - continue; + runnable.start(runnable); + pool.mutex.lock(); + if (runnable.is_parallel) { + // TODO also pop thread and join sometimes + pool.parallel_count -= 1; + } } - - pool.mutex.unlock(); - wait_group.wait(); - return; + if (pool.join_requested) break; + pool.cond.wait(&pool.mutex); } } -pub fn getIdCount(pool: *Pool) usize { - return @intCast(1 + pool.threads.items.len); -} - pub fn io(pool: *Pool) Io { return .{ .userdata = pool, .vtable = &.{ .async = async, + .asyncConcurrent = asyncParallel, + .asyncParallel = asyncParallel, .await = await, .asyncDetached = asyncDetached, .cancel = cancel, @@ -357,7 +114,7 @@ pub fn io(pool: *Pool) Io { const AsyncClosure = struct { func: *const fn (context: *anyopaque, result: *anyopaque) void, - runnable: Runnable = .{ .runFn = runFn }, + runnable: Runnable, reset_event: std.Thread.ResetEvent, select_condition: ?*std.Thread.ResetEvent, cancel_tid: std.Thread.Id, @@ -375,7 +132,7 @@ const AsyncClosure = struct { else => @compileError("unsupported std.Thread.Id: " ++ @typeName(std.Thread.Id)), }; - fn runFn(runnable: *Pool.Runnable, _: ?usize) void { + fn start(runnable: *Runnable) void { const closure: *AsyncClosure = @alignCast(@fieldParentPtr("runnable", runnable)); const tid = std.Thread.getCurrentId(); if (@cmpxchgStrong( @@ -387,6 +144,7 @@ const AsyncClosure = struct { .acquire, )) |cancel_tid| { assert(cancel_tid == canceling_tid); + closure.reset_event.set(); return; } current_closure = closure; @@ -438,9 +196,13 @@ const AsyncClosure = struct { fn waitAndFree(closure: *AsyncClosure, gpa: Allocator, result: []u8) void { closure.reset_event.wait(); - const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(closure); @memcpy(result, closure.resultPointer()[0..result.len]); - gpa.free(base[0 .. closure.result_offset + result.len]); + free(closure, gpa, result.len); + } + + fn free(closure: *AsyncClosure, gpa: Allocator, result_len: usize) void { + const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(closure); + gpa.free(base[0 .. closure.result_offset + result_len]); } }; @@ -452,18 +214,26 @@ fn async( context_alignment: std.mem.Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*Io.AnyFuture { + if (builtin.single_threaded) { + start(context.ptr, result.ptr); + return null; + } const pool: *Pool = @alignCast(@ptrCast(userdata)); - pool.mutex.lock(); - + const cpu_count = pool.cpu_count catch { + return asyncParallel(userdata, result.len, result_alignment, context, context_alignment, start) orelse { + start(context.ptr, result.ptr); + return null; + }; + }; const gpa = pool.allocator; const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); const result_offset = result_alignment.forward(context_offset + context.len); const n = result_offset + result.len; const closure: *AsyncClosure = @alignCast(@ptrCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch { - pool.mutex.unlock(); start(context.ptr, result.ptr); return null; })); + closure.* = .{ .func = start, .context_offset = context_offset, @@ -471,37 +241,124 @@ fn async( .reset_event = .{}, .cancel_tid = 0, .select_condition = null, + .runnable = .{ + .start = AsyncClosure.start, + .is_parallel = false, + }, }; + @memcpy(closure.contextPointer()[0..context.len], context); + + pool.mutex.lock(); + + const thread_capacity = cpu_count - 1 + pool.parallel_count; + + pool.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { + pool.mutex.unlock(); + closure.free(gpa, result.len); + start(context.ptr, result.ptr); + return null; + }; + pool.run_queue.prepend(&closure.runnable.node); - if (pool.threads.items.len < pool.threads.capacity) { - pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{ - .stack_size = pool.stack_size, - .allocator = gpa, - }, worker, .{pool}) catch t: { - pool.threads.items.len -= 1; - break :t undefined; + if (pool.threads.items.len < thread_capacity) { + const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch { + if (pool.threads.items.len == 0) { + assert(pool.run_queue.popFirst() == &closure.runnable.node); + pool.mutex.unlock(); + closure.free(gpa, result.len); + start(context.ptr, result.ptr); + return null; + } + // Rely on other workers to do it. + pool.mutex.unlock(); + pool.cond.signal(); + return @ptrCast(closure); }; + pool.threads.appendAssumeCapacity(thread); } pool.mutex.unlock(); pool.cond.signal(); + return @ptrCast(closure); +} +fn asyncParallel( + userdata: ?*anyopaque, + result_len: usize, + result_alignment: std.mem.Alignment, + context: []const u8, + context_alignment: std.mem.Alignment, + start: *const fn (context: *const anyopaque, result: *anyopaque) void, +) ?*Io.AnyFuture { + if (builtin.single_threaded) return null; + + const pool: *Pool = @alignCast(@ptrCast(userdata)); + const cpu_count = pool.cpu_count catch 1; + const gpa = pool.allocator; + const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); + const result_offset = result_alignment.forward(context_offset + context.len); + const n = result_offset + result_len; + const closure: *AsyncClosure = @alignCast(@ptrCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch return null)); + + closure.* = .{ + .func = start, + .context_offset = context_offset, + .result_offset = result_offset, + .reset_event = .{}, + .cancel_tid = 0, + .select_condition = null, + .runnable = .{ + .start = AsyncClosure.start, + .is_parallel = true, + }, + }; + @memcpy(closure.contextPointer()[0..context.len], context); + + pool.mutex.lock(); + + pool.parallel_count += 1; + const thread_capacity = cpu_count - 1 + pool.parallel_count; + + pool.threads.ensureTotalCapacity(gpa, thread_capacity) catch { + pool.mutex.unlock(); + closure.free(gpa, result_len); + return null; + }; + + pool.run_queue.prepend(&closure.runnable.node); + + if (pool.threads.items.len < thread_capacity) { + const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch { + assert(pool.run_queue.popFirst() == &closure.runnable.node); + pool.mutex.unlock(); + closure.free(gpa, result_len); + return null; + }; + pool.threads.appendAssumeCapacity(thread); + } + + pool.mutex.unlock(); + pool.cond.signal(); return @ptrCast(closure); } const DetachedClosure = struct { pool: *Pool, func: *const fn (context: *anyopaque) void, - runnable: Runnable = .{ .runFn = runFn }, + runnable: Runnable, context_alignment: std.mem.Alignment, context_len: usize, - fn runFn(runnable: *Pool.Runnable, _: ?usize) void { + fn start(runnable: *Runnable) void { const closure: *DetachedClosure = @alignCast(@fieldParentPtr("runnable", runnable)); closure.func(closure.contextPointer()); const gpa = closure.pool.allocator; + free(closure, gpa); + } + + fn free(closure: *DetachedClosure, gpa: Allocator) void { const base: [*]align(@alignOf(DetachedClosure)) u8 = @ptrCast(closure); gpa.free(base[0..contextEnd(closure.context_alignment, closure.context_len)]); } @@ -526,33 +383,46 @@ fn asyncDetached( context_alignment: std.mem.Alignment, start: *const fn (context: *const anyopaque) void, ) void { + if (builtin.single_threaded) return start(context.ptr); const pool: *Pool = @alignCast(@ptrCast(userdata)); - pool.mutex.lock(); - + const cpu_count = pool.cpu_count catch 1; const gpa = pool.allocator; const n = DetachedClosure.contextEnd(context_alignment, context.len); const closure: *DetachedClosure = @alignCast(@ptrCast(gpa.alignedAlloc(u8, .of(DetachedClosure), n) catch { - pool.mutex.unlock(); - start(context.ptr); - return; + return start(context.ptr); })); closure.* = .{ .pool = pool, .func = start, .context_alignment = context_alignment, .context_len = context.len, + .runnable = .{ + .start = DetachedClosure.start, + .is_parallel = false, + }, }; @memcpy(closure.contextPointer()[0..context.len], context); + + pool.mutex.lock(); + + const thread_capacity = cpu_count - 1 + pool.parallel_count; + + pool.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { + pool.mutex.unlock(); + closure.free(gpa); + return start(context.ptr); + }; + pool.run_queue.prepend(&closure.runnable.node); - if (pool.threads.items.len < pool.threads.capacity) { - pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{ - .stack_size = pool.stack_size, - .allocator = gpa, - }, worker, .{pool}) catch t: { - pool.threads.items.len -= 1; - break :t undefined; + if (pool.threads.items.len < thread_capacity) { + const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch { + assert(pool.run_queue.popFirst() == &closure.runnable.node); + pool.mutex.unlock(); + closure.free(gpa); + return start(context.ptr); }; + pool.threads.appendAssumeCapacity(thread); } pool.mutex.unlock(); diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index a3b382f372..6fddeaba5b 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -385,6 +385,8 @@ pub const CpuCountError = error{ }; /// Returns the platforms view on the number of logical CPU cores available. +/// +/// Returned value guaranteed to be >= 1. pub fn getCpuCount() CpuCountError!usize { return try Impl.getCpuCount(); }