From 8eaebf5939491b005e392698ec6e890ebaf0f86b Mon Sep 17 00:00:00 2001 From: Loris Cro Date: Tue, 4 Nov 2025 21:11:40 +0100 Subject: [PATCH 01/10] Io.Threaded PoC reimplementation This is a reimplementation of Io.Threaded that fixes the issues highlighted in the recent Zulip discussion. It's poorly tested but it does successfully run to completion the litmust test example that I offered in the discussion. This implementation has the following key design decisions: - `t.cpu_count` is used as the threadpool size. - `t.concurrency_limit` is used as the maximum number of "burst, one-shot" threads that can be spawned by `io.concurrent` past `t.cpu_count`. - `t.available_thread_count` is the number of threads in the pool that is not currently busy with work (the bookkeeping happens in the worker function). - `t.one_shot_thread_count` is the number of active threads that were spawned by `io.concurrent` past `t.cpu_count`. In this implementation: - `io.async` first tries to decrement `t.available_thread_count`. If there are no threads available, it tries to spawn a new one if possible, otherwise it runs the task immediately. - `io.concurrent` first tries to use a thread in the pool same as `io.async`, but on failure (no available threads and pool size limit reached) it tries to spawn a new one-shot thread. One shot threads run a different main function that just executes one task, decrements the number of active one shot threads, and then exits. A relevant future improvement is to have one-shot threads stay on for a few seconds (and potentially pick up a new task) to amortize spawning costs. --- lib/std/Io/Threaded.zig | 198 +++++++++++++++++++++------------------- 1 file changed, 105 insertions(+), 93 deletions(-) diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 8dd9ae13fc..b6ba5f9a82 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -24,8 +24,10 @@ run_queue: std.SinglyLinkedList = .{}, join_requested: bool = false, threads: std.ArrayList(std.Thread), stack_size: usize, -cpu_count: std.Thread.CpuCountError!usize, -concurrent_count: usize, +cpu_count: usize, // 0 means no limit +concurrency_limit: usize, // 0 means no limit +available_thread_count: usize = 0, +one_shot_thread_count: usize = 0, wsa: if (is_windows) Wsa else struct {} = .{}, @@ -70,8 +72,6 @@ const Closure = struct { start: Start, node: std.SinglyLinkedList.Node = .{}, cancel_tid: CancelId, - /// Whether this task bumps minimum number of threads in the pool. - is_concurrent: bool, const Start = *const fn (*Closure) void; @@ -103,20 +103,20 @@ pub fn init( /// here. gpa: Allocator, ) Threaded { + assert(!builtin.single_threaded); // use 'init_single_threaded' instead + var t: Threaded = .{ .allocator = gpa, .threads = .empty, .stack_size = std.Thread.SpawnConfig.default_stack_size, - .cpu_count = std.Thread.getCpuCount(), - .concurrent_count = 0, + .cpu_count = std.Thread.getCpuCount() catch 0, + .concurrency_limit = 0, .old_sig_io = undefined, .old_sig_pipe = undefined, .have_signal_handler = false, }; - if (t.cpu_count) |n| { - t.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {}; - } else |_| {} + t.threads.ensureTotalCapacity(gpa, t.cpu_count) catch {}; if (posix.Sigaction != void) { // This causes sending `posix.SIG.IO` to thread to interrupt blocking @@ -145,7 +145,7 @@ pub const init_single_threaded: Threaded = .{ .threads = .empty, .stack_size = std.Thread.SpawnConfig.default_stack_size, .cpu_count = 1, - .concurrent_count = 0, + .concurrency_limit = 0, .old_sig_io = undefined, .old_sig_pipe = undefined, .have_signal_handler = false, @@ -184,18 +184,22 @@ fn worker(t: *Threaded) void { while (t.run_queue.popFirst()) |closure_node| { t.mutex.unlock(); const closure: *Closure = @fieldParentPtr("node", closure_node); - const is_concurrent = closure.is_concurrent; closure.start(closure); t.mutex.lock(); - if (is_concurrent) { - t.concurrent_count -= 1; - } + t.available_thread_count += 1; } if (t.join_requested) break; t.cond.wait(&t.mutex); } } +fn oneShotWorker(t: *Threaded, closure: *Closure) void { + closure.start(closure); + t.mutex.lock(); + defer t.mutex.unlock(); + t.one_shot_thread_count -= 1; +} + pub fn io(t: *Threaded) Io { return .{ .userdata = t, @@ -432,7 +436,6 @@ const AsyncClosure = struct { fn init( gpa: Allocator, - mode: enum { async, concurrent }, result_len: usize, result_alignment: std.mem.Alignment, context: []const u8, @@ -454,10 +457,6 @@ const AsyncClosure = struct { .closure = .{ .cancel_tid = .none, .start = start, - .is_concurrent = switch (mode) { - .async => false, - .concurrent => true, - }, }, .func = func, .context_alignment = context_alignment, @@ -490,55 +489,51 @@ fn async( context_alignment: std.mem.Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*Io.AnyFuture { - if (builtin.single_threaded) { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + if (t.cpu_count == 1) { start(context.ptr, result.ptr); return null; } - - const t: *Threaded = @ptrCast(@alignCast(userdata)); - const cpu_count = t.cpu_count catch { - return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch { - start(context.ptr, result.ptr); - return null; - }; - }; - const gpa = t.allocator; - const ac = AsyncClosure.init(gpa, .async, result.len, result_alignment, context, context_alignment, start) catch { + const ac = AsyncClosure.init(gpa, result.len, result_alignment, context, context_alignment, start) catch { start(context.ptr, result.ptr); return null; }; t.mutex.lock(); - const thread_capacity = cpu_count - 1 + t.concurrent_count; - - t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { - t.mutex.unlock(); - ac.deinit(gpa); - start(context.ptr, result.ptr); - return null; - }; - - t.run_queue.prepend(&ac.closure.node); - - if (t.threads.items.len < thread_capacity) { - const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { - if (t.threads.items.len == 0) { - assert(t.run_queue.popFirst() == &ac.closure.node); - t.mutex.unlock(); - ac.deinit(gpa); - start(context.ptr, result.ptr); - return null; - } - // Rely on other workers to do it. + if (t.available_thread_count == 0) { + if (t.cpu_count != 0 and t.threads.items.len >= t.cpu_count) { t.mutex.unlock(); - t.cond.signal(); - return @ptrCast(ac); + ac.deinit(gpa); + start(context.ptr, result.ptr); + return null; + } + + t.threads.ensureUnusedCapacity(gpa, 1) catch { + t.mutex.unlock(); + ac.deinit(gpa); + start(context.ptr, result.ptr); + return null; }; + + const thread = std.Thread.spawn( + .{ .stack_size = t.stack_size }, + worker, + .{t}, + ) catch { + t.mutex.unlock(); + ac.deinit(gpa); + start(context.ptr, result.ptr); + return null; + }; + t.threads.appendAssumeCapacity(thread); + } else { + t.available_thread_count -= 1; } + t.run_queue.prepend(&ac.closure.node); t.mutex.unlock(); t.cond.signal(); return @ptrCast(ac); @@ -555,38 +550,49 @@ fn concurrent( if (builtin.single_threaded) return error.ConcurrencyUnavailable; const t: *Threaded = @ptrCast(@alignCast(userdata)); - const cpu_count = t.cpu_count catch 1; const gpa = t.allocator; - const ac = AsyncClosure.init(gpa, .concurrent, result_len, result_alignment, context, context_alignment, start) catch { + const ac = AsyncClosure.init(gpa, result_len, result_alignment, context, context_alignment, start) catch { return error.ConcurrencyUnavailable; }; + errdefer ac.deinit(gpa); t.mutex.lock(); + defer t.mutex.unlock(); - t.concurrent_count += 1; - const thread_capacity = cpu_count - 1 + t.concurrent_count; - - t.threads.ensureTotalCapacity(gpa, thread_capacity) catch { - t.mutex.unlock(); - ac.deinit(gpa); - return error.ConcurrencyUnavailable; - }; - - t.run_queue.prepend(&ac.closure.node); - - if (t.threads.items.len < thread_capacity) { - const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { - assert(t.run_queue.popFirst() == &ac.closure.node); - t.mutex.unlock(); - ac.deinit(gpa); - return error.ConcurrencyUnavailable; - }; - t.threads.appendAssumeCapacity(thread); + // If there's an avilable thread, use it. + if (t.available_thread_count > 0) { + t.available_thread_count -= 1; + t.run_queue.prepend(&ac.closure.node); + t.cond.signal(); + return @ptrCast(ac); } - t.mutex.unlock(); - t.cond.signal(); + // If we can spawn a normal worker, spawn it and use it. + if (t.cpu_count == 0 or t.threads.items.len < t.cpu_count) { + t.threads.ensureUnusedCapacity(gpa, 1) catch return error.ConcurrencyUnavailable; + + const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch + return error.ConcurrencyUnavailable; + + t.threads.appendAssumeCapacity(thread); + t.run_queue.prepend(&ac.closure.node); + t.cond.signal(); + return @ptrCast(ac); + } + + // If we have a concurrencty limit and we havent' hit it yet, + // spawn a new one-shot thread. + if (t.concurrency_limit != 0 and t.one_shot_thread_count >= t.concurrency_limit) + return error.ConcurrencyUnavailable; + + t.one_shot_thread_count += 1; + errdefer t.one_shot_thread_count -= 1; + + const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, oneShotWorker, .{ t, &ac.closure }) catch + return error.ConcurrencyUnavailable; + thread.detach(); + return @ptrCast(ac); } @@ -652,7 +658,6 @@ const GroupClosure = struct { .closure = .{ .cancel_tid = .none, .start = start, - .is_concurrent = false, }, .t = t, .group = group, @@ -684,12 +689,9 @@ fn groupAsync( if (builtin.single_threaded) return start(group, context.ptr); const t: *Threaded = @ptrCast(@alignCast(userdata)); - const cpu_count = t.cpu_count catch 1; - const gpa = t.allocator; - const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch { + const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch return start(group, context.ptr); - }; t.mutex.lock(); @@ -697,26 +699,36 @@ fn groupAsync( gc.node = .{ .next = @ptrCast(@alignCast(group.token)) }; group.token = &gc.node; - const thread_capacity = cpu_count - 1 + t.concurrent_count; + if (t.available_thread_count == 0) { + if (t.cpu_count != 0 and t.threads.items.len >= t.cpu_count) { + t.mutex.unlock(); + gc.deinit(gpa); + return start(group, context.ptr); + } - t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch { - t.mutex.unlock(); - gc.deinit(gpa); - return start(group, context.ptr); - }; - - t.run_queue.prepend(&gc.closure.node); - - if (t.threads.items.len < thread_capacity) { - const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { - assert(t.run_queue.popFirst() == &gc.closure.node); + t.threads.ensureUnusedCapacity(gpa, 1) catch { t.mutex.unlock(); gc.deinit(gpa); return start(group, context.ptr); }; + + const thread = std.Thread.spawn( + .{ .stack_size = t.stack_size }, + worker, + .{t}, + ) catch { + t.mutex.unlock(); + gc.deinit(gpa); + return start(group, context.ptr); + }; + t.threads.appendAssumeCapacity(thread); + } else { + t.available_thread_count -= 1; } + t.run_queue.prepend(&gc.closure.node); + // This needs to be done before unlocking the mutex to avoid a race with // the associated task finishing. const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); From ff883dd6ce910e2a0aa301e1ee84d93f53cf8765 Mon Sep 17 00:00:00 2001 From: Loris Cro Date: Fri, 7 Nov 2025 12:42:51 +0100 Subject: [PATCH 02/10] fix single-threaded builds --- lib/std/Io/Threaded.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index b6ba5f9a82..999d7b93f5 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -490,7 +490,7 @@ fn async( start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*Io.AnyFuture { const t: *Threaded = @ptrCast(@alignCast(userdata)); - if (t.cpu_count == 1) { + if (t.cpu_count == 1 or builtin.single_threaded) { start(context.ptr, result.ptr); return null; } From 69f9395b382eb564285367e539d43dc6c8edd257 Mon Sep 17 00:00:00 2001 From: Loris Cro Date: Fri, 7 Nov 2025 13:33:45 +0100 Subject: [PATCH 03/10] fix logic bug in groupAsync --- lib/std/Io/Threaded.zig | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 999d7b93f5..53d0335451 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -695,10 +695,6 @@ fn groupAsync( t.mutex.lock(); - // Append to the group linked list inside the mutex to make `Io.Group.async` thread-safe. - gc.node = .{ .next = @ptrCast(@alignCast(group.token)) }; - group.token = &gc.node; - if (t.available_thread_count == 0) { if (t.cpu_count != 0 and t.threads.items.len >= t.cpu_count) { t.mutex.unlock(); @@ -727,6 +723,10 @@ fn groupAsync( t.available_thread_count -= 1; } + // Append to the group linked list inside the mutex to make `Io.Group.async` thread-safe. + gc.node = .{ .next = @ptrCast(@alignCast(group.token)) }; + group.token = &gc.node; + t.run_queue.prepend(&gc.closure.node); // This needs to be done before unlocking the mutex to avoid a race with From b4ec78906cdb2a13b08f94690fb270726bbdb819 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 21 Nov 2025 08:49:26 -0800 Subject: [PATCH 04/10] std.Io: update async documentation to reflect the guarantee --- lib/std/Io.zig | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 40f213f3de..8c552ad8fe 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -580,6 +580,9 @@ pub const VTable = struct { /// If it returns `null` it means `result` has been already populated and /// `await` will be a no-op. /// + /// When this function returns non-null, the implementation guarantees that + /// a unit of concurrency has been assigned to the returned task. + /// /// Thread-safe. async: *const fn ( /// Corresponds to `Io.userdata`. @@ -1024,6 +1027,10 @@ pub const Group = struct { /// /// `function` *may* be called immediately, before `async` returns. /// + /// When this function returns, it is guaranteed that `function` has + /// already been called and completed, or it has successfully been assigned + /// a unit of concurrency. + /// /// After this is called, `wait` or `cancel` must be called before the /// group is deinitialized. /// @@ -1094,6 +1101,10 @@ pub fn Select(comptime U: type) type { /// /// `function` *may* be called immediately, before `async` returns. /// + /// When this function returns, it is guaranteed that `function` has + /// already been called and completed, or it has successfully been + /// assigned a unit of concurrency. + /// /// After this is called, `wait` or `cancel` must be called before the /// select is deinitialized. /// @@ -1524,8 +1535,11 @@ pub fn Queue(Elem: type) type { /// not guaranteed to be available until `await` is called. /// /// `function` *may* be called immediately, before `async` returns. This has -/// weaker guarantees than `concurrent`, making more portable and -/// reusable. +/// weaker guarantees than `concurrent`, making more portable and reusable. +/// +/// When this function returns, it is guaranteed that `function` has already +/// been called and completed, or it has successfully been assigned a unit of +/// concurrency. /// /// See also: /// * `Group` From aae85a4130ea032a17d6fadf7f3388b4c431a536 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 21 Nov 2025 09:08:37 -0800 Subject: [PATCH 05/10] std.Io.Threaded: allow calling init in single-threaded mode --- lib/std/Io/Threaded.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 53d0335451..03fcc902d9 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -103,7 +103,7 @@ pub fn init( /// here. gpa: Allocator, ) Threaded { - assert(!builtin.single_threaded); // use 'init_single_threaded' instead + if (builtin.single_threaded) return .init_single_threaded; var t: Threaded = .{ .allocator = gpa, From 13b537d77c07101a9dcb41e0839df1213ca8dd1d Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 21 Nov 2025 09:08:52 -0800 Subject: [PATCH 06/10] std.Io.Threaded: remove dead code --- lib/std/Io/Threaded.zig | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 03fcc902d9..6ceaee4505 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -90,8 +90,6 @@ const Closure = struct { } }; -pub const InitError = std.Thread.CpuCountError || Allocator.Error; - /// Related: /// * `init_single_threaded` pub fn init( From cf744aa182cf3a7c4738c43f5b1fcf6c280aa74f Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 21 Nov 2025 12:02:59 -0800 Subject: [PATCH 07/10] std.Io.Threaded: slightly different semantics while still preserving the guarantee about async() being assigned a unit of concurrency (or immediately running the task), this change: * retains the error from calling getCpuCount() * spawns all threads in detached mode, using WaitGroup to join them * treats all workers the same regardless of whether they are processing concurrent or async tasks. one thread pool does all the work, while respecting async and concurrent limits. --- lib/std/Io/Threaded.zig | 186 ++++++++++++++++------------------- lib/std/Io/Threaded/test.zig | 4 +- lib/std/Thread/WaitGroup.zig | 4 + 3 files changed, 92 insertions(+), 102 deletions(-) diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 6ceaee4505..58e91e25a5 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -22,12 +22,30 @@ mutex: std.Thread.Mutex = .{}, cond: std.Thread.Condition = .{}, run_queue: std.SinglyLinkedList = .{}, join_requested: bool = false, -threads: std.ArrayList(std.Thread), stack_size: usize, -cpu_count: usize, // 0 means no limit -concurrency_limit: usize, // 0 means no limit -available_thread_count: usize = 0, -one_shot_thread_count: usize = 0, +/// All threads are spawned detached; this is how we wait until they all exit. +wait_group: std.Thread.WaitGroup = .{}, +/// Maximum thread pool size (excluding main thread) when dispatching async +/// tasks. Until this limit, calls to `Io.async` when all threads are busy will +/// cause a new thread to be spawned and permanently added to the pool. After +/// this limit, calls to `Io.async` when all threads are busy run the task +/// immediately. +/// +/// Defaults to a number equal to logical CPU cores. +async_limit: Io.Limit, +/// Maximum thread pool size (excluding main thread) for dispatching concurrent +/// tasks. Until this limit, calls to `Io.concurrent` will increase the thread +/// pool size. +/// +/// concurrent tasks. After this number, calls to `Io.concurrent` return +/// `error.ConcurrencyUnavailable`. +concurrent_limit: Io.Limit = .unlimited, +/// Error from calling `std.Thread.getCpuCount` in `init`. +cpu_count_error: ?std.Thread.CpuCountError, +/// Number of threads that are unavailable to take tasks. To calculate +/// available count, subtract this from either `async_limit` or +/// `concurrent_limit`. +busy_count: usize = 0, wsa: if (is_windows) Wsa else struct {} = .{}, @@ -103,19 +121,18 @@ pub fn init( ) Threaded { if (builtin.single_threaded) return .init_single_threaded; + const cpu_count = std.Thread.getCpuCount(); + var t: Threaded = .{ .allocator = gpa, - .threads = .empty, .stack_size = std.Thread.SpawnConfig.default_stack_size, - .cpu_count = std.Thread.getCpuCount() catch 0, - .concurrency_limit = 0, + .async_limit = if (cpu_count) |n| .limited(n - 1) else |_| .nothing, + .cpu_count_error = if (cpu_count) |_| null else |e| e, .old_sig_io = undefined, .old_sig_pipe = undefined, .have_signal_handler = false, }; - t.threads.ensureTotalCapacity(gpa, t.cpu_count) catch {}; - if (posix.Sigaction != void) { // This causes sending `posix.SIG.IO` to thread to interrupt blocking // syscalls, returning `posix.E.INTR`. @@ -140,19 +157,17 @@ pub fn init( /// * `deinit` is safe, but unnecessary to call. pub const init_single_threaded: Threaded = .{ .allocator = .failing, - .threads = .empty, .stack_size = std.Thread.SpawnConfig.default_stack_size, - .cpu_count = 1, - .concurrency_limit = 0, + .async_limit = .nothing, + .cpu_count_error = null, + .concurrent_limit = .nothing, .old_sig_io = undefined, .old_sig_pipe = undefined, .have_signal_handler = false, }; pub fn deinit(t: *Threaded) void { - const gpa = t.allocator; t.join(); - t.threads.deinit(gpa); if (is_windows and t.wsa.status == .initialized) { if (ws2_32.WSACleanup() != 0) recoverableOsBugDetected(); } @@ -171,10 +186,12 @@ fn join(t: *Threaded) void { t.join_requested = true; } t.cond.broadcast(); - for (t.threads.items) |thread| thread.join(); + t.wait_group.wait(); } fn worker(t: *Threaded) void { + defer t.wait_group.finish(); + t.mutex.lock(); defer t.mutex.unlock(); @@ -184,20 +201,13 @@ fn worker(t: *Threaded) void { const closure: *Closure = @fieldParentPtr("node", closure_node); closure.start(closure); t.mutex.lock(); - t.available_thread_count += 1; + t.busy_count -= 1; } if (t.join_requested) break; t.cond.wait(&t.mutex); } } -fn oneShotWorker(t: *Threaded, closure: *Closure) void { - closure.start(closure); - t.mutex.lock(); - defer t.mutex.unlock(); - t.one_shot_thread_count -= 1; -} - pub fn io(t: *Threaded) Io { return .{ .userdata = t, @@ -488,7 +498,7 @@ fn async( start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*Io.AnyFuture { const t: *Threaded = @ptrCast(@alignCast(userdata)); - if (t.cpu_count == 1 or builtin.single_threaded) { + if (builtin.single_threaded or t.async_limit == .nothing) { start(context.ptr, result.ptr); return null; } @@ -500,35 +510,29 @@ fn async( t.mutex.lock(); - if (t.available_thread_count == 0) { - if (t.cpu_count != 0 and t.threads.items.len >= t.cpu_count) { - t.mutex.unlock(); - ac.deinit(gpa); - start(context.ptr, result.ptr); - return null; - } + const busy_count = t.busy_count; - t.threads.ensureUnusedCapacity(gpa, 1) catch { + if (busy_count >= @intFromEnum(t.async_limit)) { + t.mutex.unlock(); + ac.deinit(gpa); + start(context.ptr, result.ptr); + return null; + } + + t.busy_count = busy_count + 1; + + const pool_size = t.wait_group.value(); + if (pool_size - busy_count == 0) { + t.wait_group.start(); + const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { + t.wait_group.finish(); + t.busy_count = busy_count; t.mutex.unlock(); ac.deinit(gpa); start(context.ptr, result.ptr); return null; }; - - const thread = std.Thread.spawn( - .{ .stack_size = t.stack_size }, - worker, - .{t}, - ) catch { - t.mutex.unlock(); - ac.deinit(gpa); - start(context.ptr, result.ptr); - return null; - }; - - t.threads.appendAssumeCapacity(thread); - } else { - t.available_thread_count -= 1; + thread.detach(); } t.run_queue.prepend(&ac.closure.node); @@ -550,47 +554,33 @@ fn concurrent( const t: *Threaded = @ptrCast(@alignCast(userdata)); const gpa = t.allocator; - const ac = AsyncClosure.init(gpa, result_len, result_alignment, context, context_alignment, start) catch { + const ac = AsyncClosure.init(gpa, result_len, result_alignment, context, context_alignment, start) catch return error.ConcurrencyUnavailable; - }; errdefer ac.deinit(gpa); t.mutex.lock(); defer t.mutex.unlock(); - // If there's an avilable thread, use it. - if (t.available_thread_count > 0) { - t.available_thread_count -= 1; - t.run_queue.prepend(&ac.closure.node); - t.cond.signal(); - return @ptrCast(ac); - } + const busy_count = t.busy_count; - // If we can spawn a normal worker, spawn it and use it. - if (t.cpu_count == 0 or t.threads.items.len < t.cpu_count) { - t.threads.ensureUnusedCapacity(gpa, 1) catch return error.ConcurrencyUnavailable; + if (busy_count >= @intFromEnum(t.concurrent_limit)) + return error.ConcurrencyUnavailable; + + t.busy_count = busy_count + 1; + errdefer t.busy_count = busy_count; + + const pool_size = t.wait_group.value(); + if (pool_size - busy_count == 0) { + t.wait_group.start(); + errdefer t.wait_group.finish(); const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch return error.ConcurrencyUnavailable; - - t.threads.appendAssumeCapacity(thread); - t.run_queue.prepend(&ac.closure.node); - t.cond.signal(); - return @ptrCast(ac); + thread.detach(); } - // If we have a concurrencty limit and we havent' hit it yet, - // spawn a new one-shot thread. - if (t.concurrency_limit != 0 and t.one_shot_thread_count >= t.concurrency_limit) - return error.ConcurrencyUnavailable; - - t.one_shot_thread_count += 1; - errdefer t.one_shot_thread_count -= 1; - - const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, oneShotWorker, .{ t, &ac.closure }) catch - return error.ConcurrencyUnavailable; - thread.detach(); - + t.run_queue.prepend(&ac.closure.node); + t.cond.signal(); return @ptrCast(ac); } @@ -684,41 +674,37 @@ fn groupAsync( context_alignment: std.mem.Alignment, start: *const fn (*Io.Group, context: *const anyopaque) void, ) void { - if (builtin.single_threaded) return start(group, context.ptr); - const t: *Threaded = @ptrCast(@alignCast(userdata)); + if (builtin.single_threaded or t.async_limit == .nothing) + return start(group, context.ptr); + const gpa = t.allocator; const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch return start(group, context.ptr); t.mutex.lock(); - if (t.available_thread_count == 0) { - if (t.cpu_count != 0 and t.threads.items.len >= t.cpu_count) { - t.mutex.unlock(); - gc.deinit(gpa); - return start(group, context.ptr); - } + const busy_count = t.busy_count; - t.threads.ensureUnusedCapacity(gpa, 1) catch { + if (busy_count >= @intFromEnum(t.async_limit)) { + t.mutex.unlock(); + gc.deinit(gpa); + return start(group, context.ptr); + } + + t.busy_count = busy_count + 1; + + const pool_size = t.wait_group.value(); + if (pool_size - busy_count == 0) { + t.wait_group.start(); + const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { + t.wait_group.finish(); + t.busy_count = busy_count; t.mutex.unlock(); gc.deinit(gpa); return start(group, context.ptr); }; - - const thread = std.Thread.spawn( - .{ .stack_size = t.stack_size }, - worker, - .{t}, - ) catch { - t.mutex.unlock(); - gc.deinit(gpa); - return start(group, context.ptr); - }; - - t.threads.appendAssumeCapacity(thread); - } else { - t.available_thread_count -= 1; + thread.detach(); } // Append to the group linked list inside the mutex to make `Io.Group.async` thread-safe. diff --git a/lib/std/Io/Threaded/test.zig b/lib/std/Io/Threaded/test.zig index 7e6e687cf2..16afae7b63 100644 --- a/lib/std/Io/Threaded/test.zig +++ b/lib/std/Io/Threaded/test.zig @@ -10,7 +10,7 @@ test "concurrent vs main prevents deadlock via oversubscription" { defer threaded.deinit(); const io = threaded.io(); - threaded.cpu_count = 1; + threaded.async_limit = .nothing; var queue: Io.Queue(u8) = .init(&.{}); @@ -38,7 +38,7 @@ test "concurrent vs concurrent prevents deadlock via oversubscription" { defer threaded.deinit(); const io = threaded.io(); - threaded.cpu_count = 1; + threaded.async_limit = .nothing; var queue: Io.Queue(u8) = .init(&.{}); diff --git a/lib/std/Thread/WaitGroup.zig b/lib/std/Thread/WaitGroup.zig index a5970b7d69..8a9107192d 100644 --- a/lib/std/Thread/WaitGroup.zig +++ b/lib/std/Thread/WaitGroup.zig @@ -60,6 +60,10 @@ pub fn isDone(wg: *WaitGroup) bool { return (state / one_pending) == 0; } +pub fn value(wg: *WaitGroup) usize { + return wg.state.load(.monotonic) / one_pending; +} + // Spawns a new thread for the task. This is appropriate when the callee // delegates all work. pub fn spawnManager( From b052afd24bf42663ab937fab88eb072d3514519d Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 21 Nov 2025 12:05:29 -0800 Subject: [PATCH 08/10] std.Io.Threaded: import std.mem.Alignment --- lib/std/Io/Threaded.zig | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 58e91e25a5..51c0139b87 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -13,6 +13,7 @@ const net = std.Io.net; const HostName = std.Io.net.HostName; const IpAddress = std.Io.net.IpAddress; const Allocator = std.mem.Allocator; +const Alignment = std.mem.Alignment; const assert = std.debug.assert; const posix = std.posix; @@ -399,7 +400,7 @@ const AsyncClosure = struct { func: *const fn (context: *anyopaque, result: *anyopaque) void, reset_event: ResetEvent, select_condition: ?*ResetEvent, - context_alignment: std.mem.Alignment, + context_alignment: Alignment, result_offset: usize, alloc_len: usize, @@ -445,9 +446,9 @@ const AsyncClosure = struct { fn init( gpa: Allocator, result_len: usize, - result_alignment: std.mem.Alignment, + result_alignment: Alignment, context: []const u8, - context_alignment: std.mem.Alignment, + context_alignment: Alignment, func: *const fn (context: *const anyopaque, result: *anyopaque) void, ) Allocator.Error!*AsyncClosure { const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(AsyncClosure); @@ -492,9 +493,9 @@ const AsyncClosure = struct { fn async( userdata: ?*anyopaque, result: []u8, - result_alignment: std.mem.Alignment, + result_alignment: Alignment, context: []const u8, - context_alignment: std.mem.Alignment, + context_alignment: Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) ?*Io.AnyFuture { const t: *Threaded = @ptrCast(@alignCast(userdata)); @@ -544,9 +545,9 @@ fn async( fn concurrent( userdata: ?*anyopaque, result_len: usize, - result_alignment: std.mem.Alignment, + result_alignment: Alignment, context: []const u8, - context_alignment: std.mem.Alignment, + context_alignment: Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) Io.ConcurrentError!*Io.AnyFuture { if (builtin.single_threaded) return error.ConcurrencyUnavailable; @@ -591,7 +592,7 @@ const GroupClosure = struct { /// Points to sibling `GroupClosure`. Used for walking the group to cancel all. node: std.SinglyLinkedList.Node, func: *const fn (*Io.Group, context: *anyopaque) void, - context_alignment: std.mem.Alignment, + context_alignment: Alignment, alloc_len: usize, fn start(closure: *Closure) void { @@ -632,7 +633,7 @@ const GroupClosure = struct { t: *Threaded, group: *Io.Group, context: []const u8, - context_alignment: std.mem.Alignment, + context_alignment: Alignment, func: *const fn (*Io.Group, context: *const anyopaque) void, ) Allocator.Error!*GroupClosure { const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(GroupClosure); @@ -671,7 +672,7 @@ fn groupAsync( userdata: ?*anyopaque, group: *Io.Group, context: []const u8, - context_alignment: std.mem.Alignment, + context_alignment: Alignment, start: *const fn (*Io.Group, context: *const anyopaque) void, ) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); @@ -790,7 +791,7 @@ fn await( userdata: ?*anyopaque, any_future: *Io.AnyFuture, result: []u8, - result_alignment: std.mem.Alignment, + result_alignment: Alignment, ) void { _ = result_alignment; const t: *Threaded = @ptrCast(@alignCast(userdata)); @@ -802,7 +803,7 @@ fn cancel( userdata: ?*anyopaque, any_future: *Io.AnyFuture, result: []u8, - result_alignment: std.mem.Alignment, + result_alignment: Alignment, ) void { _ = result_alignment; const t: *Threaded = @ptrCast(@alignCast(userdata)); From eb038ffbc1fb2eb936439127df8acf9b9dc080d0 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 21 Nov 2025 15:18:19 -0800 Subject: [PATCH 09/10] std.Io.Threaded: forward cancellation requests to awaited tasks --- lib/std/Io/Threaded.zig | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 51c0139b87..af832dab74 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -478,10 +478,15 @@ const AsyncClosure = struct { return ac; } - fn waitAndDeinit(ac: *AsyncClosure, gpa: Allocator, result: []u8) void { - ac.reset_event.waitUncancelable(); + fn waitAndDeinit(ac: *AsyncClosure, t: *Threaded, result: []u8) void { + ac.reset_event.wait(t) catch |err| switch (err) { + error.Canceled => { + ac.closure.requestCancel(); + ac.reset_event.waitUncancelable(); + }, + }; @memcpy(result, ac.resultPointer()[0..result.len]); - ac.deinit(gpa); + ac.deinit(t.allocator); } fn deinit(ac: *AsyncClosure, gpa: Allocator) void { @@ -796,7 +801,7 @@ fn await( _ = result_alignment; const t: *Threaded = @ptrCast(@alignCast(userdata)); const closure: *AsyncClosure = @ptrCast(@alignCast(any_future)); - closure.waitAndDeinit(t.allocator, result); + closure.waitAndDeinit(t, result); } fn cancel( @@ -809,7 +814,7 @@ fn cancel( const t: *Threaded = @ptrCast(@alignCast(userdata)); const ac: *AsyncClosure = @ptrCast(@alignCast(any_future)); ac.closure.requestCancel(); - ac.waitAndDeinit(t.allocator, result); + ac.waitAndDeinit(t, result); } fn cancelRequested(userdata: ?*anyopaque) bool { From 7096e66ca9b7b1e4dc7d6d5d5bf1e6833f1be039 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 21 Nov 2025 16:42:01 -0800 Subject: [PATCH 10/10] std.Thread: update doc comments --- lib/std/Thread.zig | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 07b3c9076b..93563bb245 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -1,13 +1,14 @@ -//! This struct represents a kernel thread, and acts as a namespace for concurrency -//! primitives that operate on kernel threads. For concurrency primitives that support -//! both evented I/O and async I/O, see the respective names in the top level std namespace. +//! This struct represents a kernel thread, and acts as a namespace for +//! concurrency primitives that operate on kernel threads. For concurrency +//! primitives that interact with the I/O interface, see `std.Io`. -const std = @import("std.zig"); const builtin = @import("builtin"); -const math = std.math; -const assert = std.debug.assert; const target = builtin.target; const native_os = builtin.os.tag; + +const std = @import("std.zig"); +const math = std.math; +const assert = std.debug.assert; const posix = std.posix; const windows = std.os.windows; const testing = std.testing;