From 32dc46aae56623bff9b1fc792d49913f9295be7b Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 24 Nov 2025 07:09:52 -0800 Subject: [PATCH] std.Io: add Group.concurrent A function that participates in a group but guarantees allocation of one unit of concurrency, or returns an error. --- lib/std/Io.zig | 46 ++++++++++++++++++++++++++++++++--- lib/std/Io/Threaded.zig | 54 +++++++++++++++++++++++++++++++++++++++++ lib/std/Io/test.zig | 26 ++++++++++++++++++++ 3 files changed, 123 insertions(+), 3 deletions(-) diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 114f748199..aa860abb36 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -626,8 +626,9 @@ pub const VTable = struct { /// Thread-safe. cancelRequested: *const fn (?*anyopaque) bool, - /// Executes `start` asynchronously in a manner such that it cleans itself - /// up. This mode does not support results, await, or cancel. + /// When this function returns, implementation guarantees that `start` has + /// either already been called, or a unit of concurrency has been assigned + /// to the task of calling the function. /// /// Thread-safe. groupAsync: *const fn ( @@ -640,6 +641,17 @@ pub const VTable = struct { context_alignment: std.mem.Alignment, start: *const fn (*Group, context: *const anyopaque) void, ) void, + /// Thread-safe. + groupConcurrent: *const fn ( + /// Corresponds to `Io.userdata`. + userdata: ?*anyopaque, + /// Owner of the spawned async task. + group: *Group, + /// Copied and then passed to `start`. + context: []const u8, + context_alignment: std.mem.Alignment, + start: *const fn (*Group, context: *const anyopaque) void, + ) ConcurrentError!void, groupWait: *const fn (?*anyopaque, *Group, token: *anyopaque) void, groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void, @@ -1021,8 +1033,8 @@ pub const Group = struct { /// Threadsafe. /// /// See also: - /// * `Io.async` /// * `concurrent` + /// * `Io.async` pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void { const Args = @TypeOf(args); const TypeErased = struct { @@ -1035,6 +1047,34 @@ pub const Group = struct { io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start); } + /// Calls `function` with `args`, such that the function is not guaranteed + /// to have returned until `wait` is called, allowing the caller to + /// progress while waiting for any `Io` operations. + /// + /// The resource spawned is owned by the group; after this is called, + /// `wait` or `cancel` must be called before the group is deinitialized. + /// + /// This has stronger guarantee than `async`, placing restrictions on what kind + /// of `Io` implementations are supported. By calling `async` instead, one + /// allows, for example, stackful single-threaded blocking I/O. + /// + /// Threadsafe. + /// + /// See also: + /// * `async` + /// * `Io.concurrent` + pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void { + const Args = @TypeOf(args); + const TypeErased = struct { + fn start(group: *Group, context: *const anyopaque) void { + _ = group; + const args_casted: *const Args = @ptrCast(@alignCast(context)); + @call(.auto, function, args_casted.*); + } + }; + return io.vtable.groupConcurrent(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start); + } + /// Blocks until all tasks of the group finish. During this time, /// cancellation requests propagate to all members of the group. /// diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index af832dab74..57c1bf0c71 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -116,6 +116,7 @@ pub fn init( /// * `Io.VTable.async` /// * `Io.VTable.concurrent` /// * `Io.VTable.groupAsync` + /// * `Io.VTable.groupConcurrent` /// If these functions are avoided, then `Allocator.failing` may be passed /// here. gpa: Allocator, @@ -221,6 +222,7 @@ pub fn io(t: *Threaded) Io { .select = select, .groupAsync = groupAsync, + .groupConcurrent = groupConcurrent, .groupWait = groupWait, .groupCancel = groupCancel, @@ -317,6 +319,7 @@ pub fn ioBasic(t: *Threaded) Io { .select = select, .groupAsync = groupAsync, + .groupConcurrent = groupConcurrent, .groupWait = groupWait, .groupCancel = groupCancel, @@ -729,6 +732,57 @@ fn groupAsync( t.cond.signal(); } +fn groupConcurrent( + userdata: ?*anyopaque, + group: *Io.Group, + context: []const u8, + context_alignment: Alignment, + start: *const fn (*Io.Group, context: *const anyopaque) void, +) Io.ConcurrentError!void { + if (builtin.single_threaded) return error.ConcurrencyUnavailable; + + const t: *Threaded = @ptrCast(@alignCast(userdata)); + + const gpa = t.allocator; + const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch + return error.ConcurrencyUnavailable; + + t.mutex.lock(); + defer t.mutex.unlock(); + + const busy_count = t.busy_count; + + if (busy_count >= @intFromEnum(t.concurrent_limit)) + return error.ConcurrencyUnavailable; + + t.busy_count = busy_count + 1; + errdefer t.busy_count = busy_count; + + const pool_size = t.wait_group.value(); + if (pool_size - busy_count == 0) { + t.wait_group.start(); + errdefer t.wait_group.finish(); + + const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch + return error.ConcurrencyUnavailable; + thread.detach(); + } + + // Append to the group linked list inside the mutex to make `Io.Group.concurrent` thread-safe. + gc.node = .{ .next = @ptrCast(@alignCast(group.token)) }; + group.token = &gc.node; + + t.run_queue.prepend(&gc.closure.node); + + // This needs to be done before unlocking the mutex to avoid a race with + // the associated task finishing. + const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); + const prev_state = group_state.fetchAdd(GroupClosure.sync_one_pending, .monotonic); + assert((prev_state / GroupClosure.sync_one_pending) < (std.math.maxInt(usize) / GroupClosure.sync_one_pending)); + + t.cond.signal(); +} + fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const gpa = t.allocator; diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig index a02a50c8a8..0ac6c333c9 100644 --- a/lib/std/Io/test.zig +++ b/lib/std/Io/test.zig @@ -172,6 +172,32 @@ fn sleep(io: Io, result: *usize) void { result.* = 1; } +test "Group concurrent" { + const io = testing.io; + + var group: Io.Group = .init; + defer group.cancel(io); + var results: [2]usize = undefined; + + group.concurrent(io, count, .{ 1, 10, &results[0] }) catch |err| switch (err) { + error.ConcurrencyUnavailable => { + try testing.expect(builtin.single_threaded); + return; + }, + }; + + group.concurrent(io, count, .{ 20, 30, &results[1] }) catch |err| switch (err) { + error.ConcurrencyUnavailable => { + try testing.expect(builtin.single_threaded); + return; + }, + }; + + group.wait(io); + + try testing.expectEqualSlices(usize, &.{ 45, 245 }, &results); +} + test "select" { const io = testing.io;