std.Io: implement Group API

This commit is contained in:
Andrew Kelley 2025-09-26 21:58:51 -07:00
parent 5469db66e4
commit 8e1da66ba1
3 changed files with 102 additions and 49 deletions

View file

@ -593,18 +593,6 @@ pub const VTable = struct {
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) error{OutOfMemory}!*AnyFuture,
/// Executes `start` asynchronously in a manner such that it cleans itself
/// up. This mode does not support results, await, or cancel.
///
/// Thread-safe.
asyncDetached: *const fn (
/// Corresponds to `Io.userdata`.
userdata: ?*anyopaque,
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque) void,
) void,
/// This function is only called when `async` returns a non-null value.
///
/// Thread-safe.
@ -639,6 +627,23 @@ pub const VTable = struct {
/// Thread-safe.
cancelRequested: *const fn (?*anyopaque) bool,
/// Executes `start` asynchronously in a manner such that it cleans itself
/// up. This mode does not support results, await, or cancel.
///
/// Thread-safe.
groupAsync: *const fn (
/// Corresponds to `Io.userdata`.
userdata: ?*anyopaque,
/// Owner of the spawned async task.
group: *Group,
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque) void,
) void,
groupWait: *const fn (?*anyopaque, *Group) void,
groupCancel: *const fn (?*anyopaque, *Group) void,
/// Blocks until one of the futures from the list has a result ready, such
/// that awaiting it will not block. Returns that index.
select: *const fn (?*anyopaque, futures: []const *AnyFuture) usize,
@ -751,6 +756,45 @@ pub fn Future(Result: type) type {
};
}
pub const Group = struct {
state: usize,
context: ?*anyopaque,
pub const init: Group = .{ .state = 0, .context = null };
/// Calls `function` with `args` asynchronously. The resource spawned is
/// owned by the group.
///
/// `function` *may* be called immediately, before `async` returns.
///
/// After this is called, `wait` must be called before the group is
/// deinitialized.
///
/// See also:
/// * `async`
/// * `concurrent`
pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void {
const Args = @TypeOf(args);
const TypeErased = struct {
fn start(context: *const anyopaque) void {
const args_casted: *const Args = @ptrCast(@alignCast(context));
@call(.auto, function, args_casted.*);
}
};
io.vtable.groupAsync(io.userdata, g, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start);
}
/// Idempotent.
pub fn wait(g: *Group, io: Io) void {
io.vtable.groupWait(io.userdata, g);
}
/// Idempotent.
pub fn cancel(g: *Group, io: Io) void {
io.vtable.groupCancel(io.userdata, g);
}
};
pub const Mutex = if (true) struct {
state: State,
@ -1099,7 +1143,7 @@ pub fn Queue(Elem: type) type {
/// reusable.
///
/// See also:
/// * `asyncDetached`
/// * `Group`
pub fn async(
io: Io,
function: anytype,
@ -1159,25 +1203,6 @@ pub fn concurrent(
return future;
}
/// Calls `function` with `args` asynchronously. The resource cleans itself up
/// when the function returns. Does not support await, cancel, or a return value.
///
/// `function` *may* be called immediately, before `async` returns.
///
/// See also:
/// * `async`
/// * `concurrent`
pub fn asyncDetached(io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void {
const Args = @TypeOf(args);
const TypeErased = struct {
fn start(context: *const anyopaque) void {
const args_casted: *const Args = @ptrCast(@alignCast(context));
@call(.auto, function, args_casted.*);
}
};
io.vtable.asyncDetached(io.userdata, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start);
}
pub fn cancelRequested(io: Io) bool {
return io.vtable.cancelRequested(io.userdata);
}

View file

@ -8,7 +8,6 @@ const windows = std.os.windows;
const std = @import("../std.zig");
const Allocator = std.mem.Allocator;
const assert = std.debug.assert;
const WaitGroup = std.Thread.WaitGroup;
const posix = std.posix;
const Io = std.Io;
@ -101,10 +100,12 @@ pub fn io(pool: *Pool) Io {
.async = async,
.concurrent = concurrent,
.await = await,
.asyncDetached = asyncDetached,
.cancel = cancel,
.cancelRequested = cancelRequested,
.select = select,
.groupAsync = groupAsync,
.groupWait = groupWait,
.groupCancel = groupCancel,
.mutexLock = mutexLock,
.mutexUnlock = mutexUnlock,
@ -279,7 +280,7 @@ fn async(
.func = start,
.context_offset = context_offset,
.result_offset = result_offset,
.reset_event = .{},
.reset_event = .unset,
.cancel_tid = 0,
.select_condition = null,
.runnable = .{
@ -347,7 +348,7 @@ fn concurrent(
.func = start,
.context_offset = context_offset,
.result_offset = result_offset,
.reset_event = .{},
.reset_event = .unset,
.cancel_tid = 0,
.select_condition = null,
.runnable = .{
@ -385,41 +386,47 @@ fn concurrent(
return @ptrCast(closure);
}
const DetachedClosure = struct {
const GroupClosure = struct {
pool: *Pool,
group: *Io.Group,
func: *const fn (context: *anyopaque) void,
runnable: Runnable,
context_alignment: std.mem.Alignment,
context_len: usize,
fn start(runnable: *Runnable) void {
const closure: *DetachedClosure = @alignCast(@fieldParentPtr("runnable", runnable));
const closure: *GroupClosure = @alignCast(@fieldParentPtr("runnable", runnable));
closure.func(closure.contextPointer());
const group = closure.group;
const gpa = closure.pool.allocator;
free(closure, gpa);
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
const reset_event: *std.Thread.ResetEvent = @ptrCast(&group.context);
std.Thread.WaitGroup.finishStateless(group_state, reset_event);
}
fn free(closure: *DetachedClosure, gpa: Allocator) void {
const base: [*]align(@alignOf(DetachedClosure)) u8 = @ptrCast(closure);
fn free(closure: *GroupClosure, gpa: Allocator) void {
const base: [*]align(@alignOf(GroupClosure)) u8 = @ptrCast(closure);
gpa.free(base[0..contextEnd(closure.context_alignment, closure.context_len)]);
}
fn contextOffset(context_alignment: std.mem.Alignment) usize {
return context_alignment.forward(@sizeOf(DetachedClosure));
return context_alignment.forward(@sizeOf(GroupClosure));
}
fn contextEnd(context_alignment: std.mem.Alignment, context_len: usize) usize {
return contextOffset(context_alignment) + context_len;
}
fn contextPointer(closure: *DetachedClosure) [*]u8 {
fn contextPointer(closure: *GroupClosure) [*]u8 {
const base: [*]u8 = @ptrCast(closure);
return base + contextOffset(closure.context_alignment);
}
};
fn asyncDetached(
fn groupAsync(
userdata: ?*anyopaque,
group: *Io.Group,
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque) void,
@ -428,17 +435,18 @@ fn asyncDetached(
const pool: *Pool = @ptrCast(@alignCast(userdata));
const cpu_count = pool.cpu_count catch 1;
const gpa = pool.allocator;
const n = DetachedClosure.contextEnd(context_alignment, context.len);
const closure: *DetachedClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(DetachedClosure), n) catch {
const n = GroupClosure.contextEnd(context_alignment, context.len);
const closure: *GroupClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(GroupClosure), n) catch {
return start(context.ptr);
}));
closure.* = .{
.pool = pool,
.group = group,
.func = start,
.context_alignment = context_alignment,
.context_len = context.len,
.runnable = .{
.start = DetachedClosure.start,
.start = GroupClosure.start,
.is_parallel = false,
},
};
@ -466,10 +474,30 @@ fn asyncDetached(
pool.threads.appendAssumeCapacity(thread);
}
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
std.Thread.WaitGroup.startStateless(group_state);
pool.mutex.unlock();
pool.cond.signal();
}
fn groupWait(userdata: ?*anyopaque, group: *Io.Group) void {
if (builtin.single_threaded) return;
const pool: *Pool = @ptrCast(@alignCast(userdata));
_ = pool;
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
const reset_event: *std.Thread.ResetEvent = @ptrCast(&group.context);
std.Thread.WaitGroup.waitStateless(group_state, reset_event);
}
fn groupCancel(userdata: ?*anyopaque, group: *Io.Group) void {
if (builtin.single_threaded) return;
const pool: *Pool = @ptrCast(@alignCast(userdata));
_ = pool;
_ = group;
@panic("TODO threaded group cancel");
}
fn await(
userdata: ?*anyopaque,
any_future: *Io.AnyFuture,
@ -968,7 +996,7 @@ fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize {
const pool: *Pool = @ptrCast(@alignCast(userdata));
_ = pool;
var reset_event: std.Thread.ResetEvent = .{};
var reset_event: std.Thread.ResetEvent = .unset;
for (futures, 0..) |future, i| {
const closure: *AsyncClosure = @ptrCast(@alignCast(future));

View file

@ -653,7 +653,7 @@ pub const ResolvConf = struct {
const mapped_nameservers = if (any_ip6) ip4_mapped[0..rc.nameservers_len] else rc.nameservers();
var group: Io.Group = .{};
var group: Io.Group = .init;
defer group.cancel();
for (queries) |query| {
@ -702,7 +702,7 @@ test ResolvConf {
.search_buffer = undefined,
.search_len = 0,
.ndots = 1,
.timeout = 5,
.timeout = .seconds(5),
.attempts = 2,
};