mirror of
https://codeberg.org/ziglang/zig.git
synced 2025-12-06 13:54:21 +00:00
std.Io: add Group.concurrent
A function that participates in a group but guarantees allocation of one unit of concurrency, or returns an error.
This commit is contained in:
parent
66fe584ead
commit
3a62220082
3 changed files with 123 additions and 3 deletions
|
|
@ -626,8 +626,9 @@ pub const VTable = struct {
|
|||
/// Thread-safe.
|
||||
cancelRequested: *const fn (?*anyopaque) bool,
|
||||
|
||||
/// Executes `start` asynchronously in a manner such that it cleans itself
|
||||
/// up. This mode does not support results, await, or cancel.
|
||||
/// When this function returns, implementation guarantees that `start` has
|
||||
/// either already been called, or a unit of concurrency has been assigned
|
||||
/// to the task of calling the function.
|
||||
///
|
||||
/// Thread-safe.
|
||||
groupAsync: *const fn (
|
||||
|
|
@ -640,6 +641,17 @@ pub const VTable = struct {
|
|||
context_alignment: std.mem.Alignment,
|
||||
start: *const fn (*Group, context: *const anyopaque) void,
|
||||
) void,
|
||||
/// Thread-safe.
|
||||
groupConcurrent: *const fn (
|
||||
/// Corresponds to `Io.userdata`.
|
||||
userdata: ?*anyopaque,
|
||||
/// Owner of the spawned async task.
|
||||
group: *Group,
|
||||
/// Copied and then passed to `start`.
|
||||
context: []const u8,
|
||||
context_alignment: std.mem.Alignment,
|
||||
start: *const fn (*Group, context: *const anyopaque) void,
|
||||
) ConcurrentError!void,
|
||||
groupWait: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
|
||||
groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
|
||||
|
||||
|
|
@ -1021,8 +1033,8 @@ pub const Group = struct {
|
|||
/// Threadsafe.
|
||||
///
|
||||
/// See also:
|
||||
/// * `Io.async`
|
||||
/// * `concurrent`
|
||||
/// * `Io.async`
|
||||
pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void {
|
||||
const Args = @TypeOf(args);
|
||||
const TypeErased = struct {
|
||||
|
|
@ -1035,6 +1047,34 @@ pub const Group = struct {
|
|||
io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
|
||||
}
|
||||
|
||||
/// Calls `function` with `args`, such that the function is not guaranteed
|
||||
/// to have returned until `wait` is called, allowing the caller to
|
||||
/// progress while waiting for any `Io` operations.
|
||||
///
|
||||
/// The resource spawned is owned by the group; after this is called,
|
||||
/// `wait` or `cancel` must be called before the group is deinitialized.
|
||||
///
|
||||
/// This has stronger guarantee than `async`, placing restrictions on what kind
|
||||
/// of `Io` implementations are supported. By calling `async` instead, one
|
||||
/// allows, for example, stackful single-threaded blocking I/O.
|
||||
///
|
||||
/// Threadsafe.
|
||||
///
|
||||
/// See also:
|
||||
/// * `async`
|
||||
/// * `Io.concurrent`
|
||||
pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void {
|
||||
const Args = @TypeOf(args);
|
||||
const TypeErased = struct {
|
||||
fn start(group: *Group, context: *const anyopaque) void {
|
||||
_ = group;
|
||||
const args_casted: *const Args = @ptrCast(@alignCast(context));
|
||||
@call(.auto, function, args_casted.*);
|
||||
}
|
||||
};
|
||||
return io.vtable.groupConcurrent(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
|
||||
}
|
||||
|
||||
/// Blocks until all tasks of the group finish. During this time,
|
||||
/// cancellation requests propagate to all members of the group.
|
||||
///
|
||||
|
|
|
|||
|
|
@ -116,6 +116,7 @@ pub fn init(
|
|||
/// * `Io.VTable.async`
|
||||
/// * `Io.VTable.concurrent`
|
||||
/// * `Io.VTable.groupAsync`
|
||||
/// * `Io.VTable.groupConcurrent`
|
||||
/// If these functions are avoided, then `Allocator.failing` may be passed
|
||||
/// here.
|
||||
gpa: Allocator,
|
||||
|
|
@ -221,6 +222,7 @@ pub fn io(t: *Threaded) Io {
|
|||
.select = select,
|
||||
|
||||
.groupAsync = groupAsync,
|
||||
.groupConcurrent = groupConcurrent,
|
||||
.groupWait = groupWait,
|
||||
.groupCancel = groupCancel,
|
||||
|
||||
|
|
@ -317,6 +319,7 @@ pub fn ioBasic(t: *Threaded) Io {
|
|||
.select = select,
|
||||
|
||||
.groupAsync = groupAsync,
|
||||
.groupConcurrent = groupConcurrent,
|
||||
.groupWait = groupWait,
|
||||
.groupCancel = groupCancel,
|
||||
|
||||
|
|
@ -729,6 +732,57 @@ fn groupAsync(
|
|||
t.cond.signal();
|
||||
}
|
||||
|
||||
fn groupConcurrent(
|
||||
userdata: ?*anyopaque,
|
||||
group: *Io.Group,
|
||||
context: []const u8,
|
||||
context_alignment: Alignment,
|
||||
start: *const fn (*Io.Group, context: *const anyopaque) void,
|
||||
) Io.ConcurrentError!void {
|
||||
if (builtin.single_threaded) return error.ConcurrencyUnavailable;
|
||||
|
||||
const t: *Threaded = @ptrCast(@alignCast(userdata));
|
||||
|
||||
const gpa = t.allocator;
|
||||
const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch
|
||||
return error.ConcurrencyUnavailable;
|
||||
|
||||
t.mutex.lock();
|
||||
defer t.mutex.unlock();
|
||||
|
||||
const busy_count = t.busy_count;
|
||||
|
||||
if (busy_count >= @intFromEnum(t.concurrent_limit))
|
||||
return error.ConcurrencyUnavailable;
|
||||
|
||||
t.busy_count = busy_count + 1;
|
||||
errdefer t.busy_count = busy_count;
|
||||
|
||||
const pool_size = t.wait_group.value();
|
||||
if (pool_size - busy_count == 0) {
|
||||
t.wait_group.start();
|
||||
errdefer t.wait_group.finish();
|
||||
|
||||
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch
|
||||
return error.ConcurrencyUnavailable;
|
||||
thread.detach();
|
||||
}
|
||||
|
||||
// Append to the group linked list inside the mutex to make `Io.Group.concurrent` thread-safe.
|
||||
gc.node = .{ .next = @ptrCast(@alignCast(group.token)) };
|
||||
group.token = &gc.node;
|
||||
|
||||
t.run_queue.prepend(&gc.closure.node);
|
||||
|
||||
// This needs to be done before unlocking the mutex to avoid a race with
|
||||
// the associated task finishing.
|
||||
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
|
||||
const prev_state = group_state.fetchAdd(GroupClosure.sync_one_pending, .monotonic);
|
||||
assert((prev_state / GroupClosure.sync_one_pending) < (std.math.maxInt(usize) / GroupClosure.sync_one_pending));
|
||||
|
||||
t.cond.signal();
|
||||
}
|
||||
|
||||
fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
|
||||
const t: *Threaded = @ptrCast(@alignCast(userdata));
|
||||
const gpa = t.allocator;
|
||||
|
|
|
|||
|
|
@ -172,6 +172,32 @@ fn sleep(io: Io, result: *usize) void {
|
|||
result.* = 1;
|
||||
}
|
||||
|
||||
test "Group concurrent" {
|
||||
const io = testing.io;
|
||||
|
||||
var group: Io.Group = .init;
|
||||
defer group.cancel(io);
|
||||
var results: [2]usize = undefined;
|
||||
|
||||
group.concurrent(io, count, .{ 1, 10, &results[0] }) catch |err| switch (err) {
|
||||
error.ConcurrencyUnavailable => {
|
||||
try testing.expect(builtin.single_threaded);
|
||||
return;
|
||||
},
|
||||
};
|
||||
|
||||
group.concurrent(io, count, .{ 20, 30, &results[1] }) catch |err| switch (err) {
|
||||
error.ConcurrencyUnavailable => {
|
||||
try testing.expect(builtin.single_threaded);
|
||||
return;
|
||||
},
|
||||
};
|
||||
|
||||
group.wait(io);
|
||||
|
||||
try testing.expectEqualSlices(usize, &.{ 45, 245 }, &results);
|
||||
}
|
||||
|
||||
test "select" {
|
||||
const io = testing.io;
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue