std.Io: add asyncConcurrent and asyncParallel

This commit is contained in:
Andrew Kelley 2025-07-17 20:26:07 -07:00
parent 384545acbc
commit 33b10abaf6
4 changed files with 331 additions and 310 deletions

View file

@ -934,6 +934,32 @@ pub const VTable = struct {
context_alignment: std.mem.Alignment, context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void, start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*AnyFuture, ) ?*AnyFuture,
/// Returning `null` indicates resource allocation failed.
///
/// Thread-safe.
asyncConcurrent: *const fn (
/// Corresponds to `Io.userdata`.
userdata: ?*anyopaque,
result_len: usize,
result_alignment: std.mem.Alignment,
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*AnyFuture,
/// Returning `null` indicates resource allocation failed.
///
/// Thread-safe.
asyncParallel: *const fn (
/// Corresponds to `Io.userdata`.
userdata: ?*anyopaque,
result_len: usize,
result_alignment: std.mem.Alignment,
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*AnyFuture,
/// Executes `start` asynchronously in a manner such that it cleans itself /// Executes `start` asynchronously in a manner such that it cleans itself
/// up. This mode does not support results, await, or cancel. /// up. This mode does not support results, await, or cancel.
/// ///
@ -1491,7 +1517,18 @@ pub fn Queue(Elem: type) type {
/// Calls `function` with `args`, such that the return value of the function is /// Calls `function` with `args`, such that the return value of the function is
/// not guaranteed to be available until `await` is called. /// not guaranteed to be available until `await` is called.
pub fn async(io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) { ///
/// `function` *may* be called immediately, before `async` returns. This has
/// weaker guarantees than `asyncConcurrent` and `asyncParallel`, making it the
/// most portable and reusable among the async family functions.
///
/// See also:
/// * `asyncDetached`
pub fn async(
io: Io,
function: anytype,
args: std.meta.ArgsTuple(@TypeOf(function)),
) Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?; const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?;
const Args = @TypeOf(args); const Args = @TypeOf(args);
const TypeErased = struct { const TypeErased = struct {
@ -1513,8 +1550,86 @@ pub fn async(io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(functio
return future; return future;
} }
/// Calls `function` with `args`, such that the return value of the function is
/// not guaranteed to be available until `await` is called, passing control
/// flow back to the caller while waiting for any `Io` operations.
///
/// This has a weaker guarantee than `asyncParallel`, making it more portable
/// and reusable, however it has stronger guarantee than `async`, placing
/// restrictions on what kind of `Io` implementations are supported. By calling
/// `async` instead, one allows, for example, stackful single-threaded blocking I/O.
pub fn asyncConcurrent(
io: Io,
function: anytype,
args: std.meta.ArgsTuple(@TypeOf(function)),
) error{OutOfMemory}!Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?;
const Args = @TypeOf(args);
const TypeErased = struct {
fn start(context: *const anyopaque, result: *anyopaque) void {
const args_casted: *const Args = @alignCast(@ptrCast(context));
const result_casted: *Result = @ptrCast(@alignCast(result));
result_casted.* = @call(.auto, function, args_casted.*);
}
};
var future: Future(Result) = undefined;
future.any_future = io.vtable.asyncConcurrent(
io.userdata,
@sizeOf(Result),
.of(Result),
@ptrCast((&args)[0..1]),
.of(Args),
TypeErased.start,
);
return future;
}
/// Calls `function` with `args`, such that the return value of the function is
/// not guaranteed to be available until `await` is called, while simultaneously
/// passing control flow back to the caller.
///
/// This has the strongest guarantees of all async family functions, placing
/// the most restrictions on what kind of `Io` implementations are supported.
/// By calling `asyncConcurrent` instead, one allows, for example,
/// stackful single-threaded non-blocking I/O.
///
/// See also:
/// * `asyncConcurrent`
/// * `async`
pub fn asyncParallel(
io: Io,
function: anytype,
args: std.meta.ArgsTuple(@TypeOf(function)),
) error{OutOfMemory}!Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?;
const Args = @TypeOf(args);
const TypeErased = struct {
fn start(context: *const anyopaque, result: *anyopaque) void {
const args_casted: *const Args = @alignCast(@ptrCast(context));
const result_casted: *Result = @ptrCast(@alignCast(result));
result_casted.* = @call(.auto, function, args_casted.*);
}
};
var future: Future(Result) = undefined;
future.any_future = io.vtable.asyncConcurrent(
io.userdata,
@ptrCast((&future.result)[0..1]),
.of(Result),
@ptrCast((&args)[0..1]),
.of(Args),
TypeErased.start,
);
return future;
}
/// Calls `function` with `args` asynchronously. The resource cleans itself up /// Calls `function` with `args` asynchronously. The resource cleans itself up
/// when the function returns. Does not support await, cancel, or a return value. /// when the function returns. Does not support await, cancel, or a return value.
///
/// `function` *may* be called immediately, before `async` returns.
///
/// See also:
/// * `async`
/// * `asyncConcurrent`
pub fn asyncDetached(io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void { pub fn asyncDetached(io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void {
const Args = @TypeOf(args); const Args = @TypeOf(args);
const TypeErased = struct { const TypeErased = struct {
@ -1526,6 +1641,10 @@ pub fn asyncDetached(io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf
io.vtable.asyncDetached(io.userdata, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start); io.vtable.asyncDetached(io.userdata, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start);
} }
pub fn cancelRequested(io: Io) bool {
return io.vtable.cancelRequested(io.userdata);
}
pub fn now(io: Io, clockid: std.posix.clockid_t) ClockGetTimeError!Timestamp { pub fn now(io: Io, clockid: std.posix.clockid_t) ClockGetTimeError!Timestamp {
return io.vtable.now(io.userdata, clockid); return io.vtable.now(io.userdata, clockid);
} }

View file

@ -139,6 +139,8 @@ pub fn io(el: *EventLoop) Io {
.userdata = el, .userdata = el,
.vtable = &.{ .vtable = &.{
.async = async, .async = async,
.asyncConcurrent = asyncConcurrent,
.asyncParallel = asyncParallel,
.await = await, .await = await,
.asyncDetached = asyncDetached, .asyncDetached = asyncDetached,
.select = select, .select = select,
@ -877,16 +879,27 @@ fn async(
context_alignment: Alignment, context_alignment: Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void, start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*std.Io.AnyFuture { ) ?*std.Io.AnyFuture {
assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO return asyncConcurrent(userdata, result.len, result_alignment, context, context_alignment, start) orelse {
assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
assert(result.len <= Fiber.max_result_size); // TODO
assert(context.len <= Fiber.max_context_size); // TODO
const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
const fiber = Fiber.allocate(event_loop) catch {
start(context.ptr, result.ptr); start(context.ptr, result.ptr);
return null; return null;
}; };
}
fn asyncConcurrent(
userdata: ?*anyopaque,
result_len: usize,
result_alignment: Alignment,
context: []const u8,
context_alignment: Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*std.Io.AnyFuture {
assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO
assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
assert(result_len <= Fiber.max_result_size); // TODO
assert(context.len <= Fiber.max_context_size); // TODO
const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
const fiber = Fiber.allocate(event_loop) catch return null;
std.log.debug("allocated {*}", .{fiber}); std.log.debug("allocated {*}", .{fiber});
const closure: *AsyncClosure = .fromFiber(fiber); const closure: *AsyncClosure = .fromFiber(fiber);
@ -925,6 +938,23 @@ fn async(
return @ptrCast(fiber); return @ptrCast(fiber);
} }
fn asyncParallel(
userdata: ?*anyopaque,
result_len: usize,
result_alignment: Alignment,
context: []const u8,
context_alignment: Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*std.Io.AnyFuture {
_ = userdata;
_ = result_len;
_ = result_alignment;
_ = context;
_ = context_alignment;
_ = start;
@panic("TODO");
}
const DetachedClosure = struct { const DetachedClosure = struct {
event_loop: *EventLoop, event_loop: *EventLoop,
fiber: *Fiber, fiber: *Fiber,

View file

@ -6,331 +6,88 @@ const WaitGroup = std.Thread.WaitGroup;
const Io = std.Io; const Io = std.Io;
const Pool = @This(); const Pool = @This();
/// Must be a thread-safe allocator. /// Thread-safe.
allocator: std.mem.Allocator, allocator: Allocator,
mutex: std.Thread.Mutex = .{}, mutex: std.Thread.Mutex = .{},
cond: std.Thread.Condition = .{}, cond: std.Thread.Condition = .{},
run_queue: std.SinglyLinkedList = .{}, run_queue: std.SinglyLinkedList = .{},
is_running: bool = true, join_requested: bool = false,
threads: std.ArrayListUnmanaged(std.Thread), threads: std.ArrayListUnmanaged(std.Thread),
ids: if (builtin.single_threaded) struct {
inline fn deinit(_: @This(), _: std.mem.Allocator) void {}
fn getIndex(_: @This(), _: std.Thread.Id) usize {
return 0;
}
} else std.AutoArrayHashMapUnmanaged(std.Thread.Id, void),
stack_size: usize, stack_size: usize,
cpu_count: std.Thread.CpuCountError!usize,
parallel_count: usize,
threadlocal var current_closure: ?*AsyncClosure = null; threadlocal var current_closure: ?*AsyncClosure = null;
pub const Runnable = struct { pub const Runnable = struct {
runFn: RunProto, start: Start,
node: std.SinglyLinkedList.Node = .{}, node: std.SinglyLinkedList.Node = .{},
is_parallel: bool,
pub const Start = *const fn (*Runnable) void;
}; };
pub const RunProto = *const fn (*Runnable, id: ?usize) void; pub const InitError = std.Thread.CpuCountError || Allocator.Error;
pub const Options = struct { pub fn init(gpa: Allocator) Pool {
allocator: std.mem.Allocator, var pool: Pool = .{
n_jobs: ?usize = null,
track_ids: bool = false,
stack_size: usize = std.Thread.SpawnConfig.default_stack_size,
};
pub fn init(pool: *Pool, options: Options) !void {
const gpa = options.allocator;
const thread_count = options.n_jobs orelse @max(1, std.Thread.getCpuCount() catch 1);
const threads = try gpa.alloc(std.Thread, thread_count);
errdefer gpa.free(threads);
pool.* = .{
.allocator = gpa, .allocator = gpa,
.threads = .initBuffer(threads), .threads = .empty,
.ids = .{}, .stack_size = std.Thread.SpawnConfig.default_stack_size,
.stack_size = options.stack_size, .cpu_count = std.Thread.getCpuCount(),
.parallel_count = 0,
}; };
if (pool.cpu_count) |n| {
if (builtin.single_threaded) return; pool.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {};
} else |_| {}
if (options.track_ids) { return pool;
try pool.ids.ensureTotalCapacity(gpa, 1 + thread_count);
pool.ids.putAssumeCapacityNoClobber(std.Thread.getCurrentId(), {});
}
} }
pub fn deinit(pool: *Pool) void { pub fn deinit(pool: *Pool) void {
const gpa = pool.allocator; const gpa = pool.allocator;
pool.join(); pool.join();
pool.threads.deinit(gpa); pool.threads.deinit(gpa);
pool.ids.deinit(gpa);
pool.* = undefined; pool.* = undefined;
} }
fn join(pool: *Pool) void { fn join(pool: *Pool) void {
if (builtin.single_threaded) return; if (builtin.single_threaded) return;
{ {
pool.mutex.lock(); pool.mutex.lock();
defer pool.mutex.unlock(); defer pool.mutex.unlock();
pool.join_requested = true;
// ensure future worker threads exit the dequeue loop
pool.is_running = false;
} }
// wake up any sleeping threads (this can be done outside the mutex)
// then wait for all the threads we know are spawned to complete.
pool.cond.broadcast(); pool.cond.broadcast();
for (pool.threads.items) |thread| thread.join(); for (pool.threads.items) |thread| thread.join();
} }
/// Runs `func` in the thread pool, calling `WaitGroup.start` beforehand, and
/// `WaitGroup.finish` after it returns.
///
/// In the case that queuing the function call fails to allocate memory, or the
/// target is single-threaded, the function is called directly.
pub fn spawnWg(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, args: anytype) void {
wait_group.start();
if (builtin.single_threaded) {
@call(.auto, func, args);
wait_group.finish();
return;
}
const Args = @TypeOf(args);
const Closure = struct {
arguments: Args,
pool: *Pool,
runnable: Runnable = .{ .runFn = runFn },
wait_group: *WaitGroup,
fn runFn(runnable: *Runnable, _: ?usize) void {
const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable));
@call(.auto, func, closure.arguments);
closure.wait_group.finish();
closure.pool.allocator.destroy(closure);
}
};
pool.mutex.lock();
const gpa = pool.allocator;
const closure = gpa.create(Closure) catch {
pool.mutex.unlock();
@call(.auto, func, args);
wait_group.finish();
return;
};
closure.* = .{
.arguments = args,
.pool = pool,
.wait_group = wait_group,
};
pool.run_queue.prepend(&closure.runnable.node);
if (pool.threads.items.len < pool.threads.capacity) {
pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{
.stack_size = pool.stack_size,
.allocator = gpa,
}, worker, .{pool}) catch t: {
pool.threads.items.len -= 1;
break :t undefined;
};
}
pool.mutex.unlock();
pool.cond.signal();
}
/// Runs `func` in the thread pool, calling `WaitGroup.start` beforehand, and
/// `WaitGroup.finish` after it returns.
///
/// The first argument passed to `func` is a dense `usize` thread id, the rest
/// of the arguments are passed from `args`. Requires the pool to have been
/// initialized with `.track_ids = true`.
///
/// In the case that queuing the function call fails to allocate memory, or the
/// target is single-threaded, the function is called directly.
pub fn spawnWgId(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, args: anytype) void {
wait_group.start();
if (builtin.single_threaded) {
@call(.auto, func, .{0} ++ args);
wait_group.finish();
return;
}
const Args = @TypeOf(args);
const Closure = struct {
arguments: Args,
pool: *Pool,
runnable: Runnable = .{ .runFn = runFn },
wait_group: *WaitGroup,
fn runFn(runnable: *Runnable, id: ?usize) void {
const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable));
@call(.auto, func, .{id.?} ++ closure.arguments);
closure.wait_group.finish();
closure.pool.allocator.destroy(closure);
}
};
pool.mutex.lock();
const gpa = pool.allocator;
const closure = gpa.create(Closure) catch {
const id: ?usize = pool.ids.getIndex(std.Thread.getCurrentId());
pool.mutex.unlock();
@call(.auto, func, .{id.?} ++ args);
wait_group.finish();
return;
};
closure.* = .{
.arguments = args,
.pool = pool,
.wait_group = wait_group,
};
pool.run_queue.prepend(&closure.runnable.node);
if (pool.threads.items.len < pool.threads.capacity) {
pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{
.stack_size = pool.stack_size,
.allocator = gpa,
}, worker, .{pool}) catch t: {
pool.threads.items.len -= 1;
break :t undefined;
};
}
pool.mutex.unlock();
pool.cond.signal();
}
pub fn spawn(pool: *Pool, comptime func: anytype, args: anytype) void {
if (builtin.single_threaded) {
@call(.auto, func, args);
return;
}
const Args = @TypeOf(args);
const Closure = struct {
arguments: Args,
pool: *Pool,
runnable: Runnable = .{ .runFn = runFn },
fn runFn(runnable: *Runnable, _: ?usize) void {
const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable));
@call(.auto, func, closure.arguments);
closure.pool.allocator.destroy(closure);
}
};
pool.mutex.lock();
const gpa = pool.allocator;
const closure = gpa.create(Closure) catch {
pool.mutex.unlock();
@call(.auto, func, args);
return;
};
closure.* = .{
.arguments = args,
.pool = pool,
};
pool.run_queue.prepend(&closure.runnable.node);
if (pool.threads.items.len < pool.threads.capacity) {
pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{
.stack_size = pool.stack_size,
.allocator = gpa,
}, worker, .{pool}) catch t: {
pool.threads.items.len -= 1;
break :t undefined;
};
}
pool.mutex.unlock();
pool.cond.signal();
}
test spawn {
const TestFn = struct {
fn checkRun(completed: *bool) void {
completed.* = true;
}
};
var completed: bool = false;
{
var pool: Pool = undefined;
try pool.init(.{
.allocator = std.testing.allocator,
});
defer pool.deinit();
pool.spawn(TestFn.checkRun, .{&completed});
}
try std.testing.expectEqual(true, completed);
}
fn worker(pool: *Pool) void { fn worker(pool: *Pool) void {
pool.mutex.lock(); pool.mutex.lock();
defer pool.mutex.unlock(); defer pool.mutex.unlock();
const id: ?usize = if (pool.ids.count() > 0) @intCast(pool.ids.count()) else null;
if (id) |_| pool.ids.putAssumeCapacityNoClobber(std.Thread.getCurrentId(), {});
while (true) { while (true) {
while (pool.run_queue.popFirst()) |run_node| { while (pool.run_queue.popFirst()) |run_node| {
// Temporarily unlock the mutex in order to execute the run_node
pool.mutex.unlock();
defer pool.mutex.lock();
const runnable: *Runnable = @fieldParentPtr("node", run_node);
runnable.runFn(runnable, id);
}
// Stop executing instead of waiting if the thread pool is no longer running.
if (pool.is_running) {
pool.cond.wait(&pool.mutex);
} else {
break;
}
}
}
pub fn waitAndWork(pool: *Pool, wait_group: *WaitGroup) void {
var id: ?usize = null;
while (!wait_group.isDone()) {
pool.mutex.lock();
if (pool.run_queue.popFirst()) |run_node| {
id = id orelse pool.ids.getIndex(std.Thread.getCurrentId());
pool.mutex.unlock(); pool.mutex.unlock();
const runnable: *Runnable = @fieldParentPtr("node", run_node); const runnable: *Runnable = @fieldParentPtr("node", run_node);
runnable.runFn(runnable, id); runnable.start(runnable);
continue; pool.mutex.lock();
if (runnable.is_parallel) {
// TODO also pop thread and join sometimes
pool.parallel_count -= 1;
}
} }
if (pool.join_requested) break;
pool.mutex.unlock(); pool.cond.wait(&pool.mutex);
wait_group.wait();
return;
} }
} }
pub fn getIdCount(pool: *Pool) usize {
return @intCast(1 + pool.threads.items.len);
}
pub fn io(pool: *Pool) Io { pub fn io(pool: *Pool) Io {
return .{ return .{
.userdata = pool, .userdata = pool,
.vtable = &.{ .vtable = &.{
.async = async, .async = async,
.asyncConcurrent = asyncParallel,
.asyncParallel = asyncParallel,
.await = await, .await = await,
.asyncDetached = asyncDetached, .asyncDetached = asyncDetached,
.cancel = cancel, .cancel = cancel,
@ -357,7 +114,7 @@ pub fn io(pool: *Pool) Io {
const AsyncClosure = struct { const AsyncClosure = struct {
func: *const fn (context: *anyopaque, result: *anyopaque) void, func: *const fn (context: *anyopaque, result: *anyopaque) void,
runnable: Runnable = .{ .runFn = runFn }, runnable: Runnable,
reset_event: std.Thread.ResetEvent, reset_event: std.Thread.ResetEvent,
select_condition: ?*std.Thread.ResetEvent, select_condition: ?*std.Thread.ResetEvent,
cancel_tid: std.Thread.Id, cancel_tid: std.Thread.Id,
@ -375,7 +132,7 @@ const AsyncClosure = struct {
else => @compileError("unsupported std.Thread.Id: " ++ @typeName(std.Thread.Id)), else => @compileError("unsupported std.Thread.Id: " ++ @typeName(std.Thread.Id)),
}; };
fn runFn(runnable: *Pool.Runnable, _: ?usize) void { fn start(runnable: *Runnable) void {
const closure: *AsyncClosure = @alignCast(@fieldParentPtr("runnable", runnable)); const closure: *AsyncClosure = @alignCast(@fieldParentPtr("runnable", runnable));
const tid = std.Thread.getCurrentId(); const tid = std.Thread.getCurrentId();
if (@cmpxchgStrong( if (@cmpxchgStrong(
@ -387,6 +144,7 @@ const AsyncClosure = struct {
.acquire, .acquire,
)) |cancel_tid| { )) |cancel_tid| {
assert(cancel_tid == canceling_tid); assert(cancel_tid == canceling_tid);
closure.reset_event.set();
return; return;
} }
current_closure = closure; current_closure = closure;
@ -438,9 +196,13 @@ const AsyncClosure = struct {
fn waitAndFree(closure: *AsyncClosure, gpa: Allocator, result: []u8) void { fn waitAndFree(closure: *AsyncClosure, gpa: Allocator, result: []u8) void {
closure.reset_event.wait(); closure.reset_event.wait();
const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(closure);
@memcpy(result, closure.resultPointer()[0..result.len]); @memcpy(result, closure.resultPointer()[0..result.len]);
gpa.free(base[0 .. closure.result_offset + result.len]); free(closure, gpa, result.len);
}
fn free(closure: *AsyncClosure, gpa: Allocator, result_len: usize) void {
const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(closure);
gpa.free(base[0 .. closure.result_offset + result_len]);
} }
}; };
@ -452,18 +214,26 @@ fn async(
context_alignment: std.mem.Alignment, context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void, start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*Io.AnyFuture { ) ?*Io.AnyFuture {
if (builtin.single_threaded) {
start(context.ptr, result.ptr);
return null;
}
const pool: *Pool = @alignCast(@ptrCast(userdata)); const pool: *Pool = @alignCast(@ptrCast(userdata));
pool.mutex.lock(); const cpu_count = pool.cpu_count catch {
return asyncParallel(userdata, result.len, result_alignment, context, context_alignment, start) orelse {
start(context.ptr, result.ptr);
return null;
};
};
const gpa = pool.allocator; const gpa = pool.allocator;
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len); const result_offset = result_alignment.forward(context_offset + context.len);
const n = result_offset + result.len; const n = result_offset + result.len;
const closure: *AsyncClosure = @alignCast(@ptrCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch { const closure: *AsyncClosure = @alignCast(@ptrCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch {
pool.mutex.unlock();
start(context.ptr, result.ptr); start(context.ptr, result.ptr);
return null; return null;
})); }));
closure.* = .{ closure.* = .{
.func = start, .func = start,
.context_offset = context_offset, .context_offset = context_offset,
@ -471,37 +241,124 @@ fn async(
.reset_event = .{}, .reset_event = .{},
.cancel_tid = 0, .cancel_tid = 0,
.select_condition = null, .select_condition = null,
.runnable = .{
.start = AsyncClosure.start,
.is_parallel = false,
},
}; };
@memcpy(closure.contextPointer()[0..context.len], context); @memcpy(closure.contextPointer()[0..context.len], context);
pool.mutex.lock();
const thread_capacity = cpu_count - 1 + pool.parallel_count;
pool.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
pool.mutex.unlock();
closure.free(gpa, result.len);
start(context.ptr, result.ptr);
return null;
};
pool.run_queue.prepend(&closure.runnable.node); pool.run_queue.prepend(&closure.runnable.node);
if (pool.threads.items.len < pool.threads.capacity) { if (pool.threads.items.len < thread_capacity) {
pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{ const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch {
.stack_size = pool.stack_size, if (pool.threads.items.len == 0) {
.allocator = gpa, assert(pool.run_queue.popFirst() == &closure.runnable.node);
}, worker, .{pool}) catch t: { pool.mutex.unlock();
pool.threads.items.len -= 1; closure.free(gpa, result.len);
break :t undefined; start(context.ptr, result.ptr);
return null;
}
// Rely on other workers to do it.
pool.mutex.unlock();
pool.cond.signal();
return @ptrCast(closure);
}; };
pool.threads.appendAssumeCapacity(thread);
} }
pool.mutex.unlock(); pool.mutex.unlock();
pool.cond.signal(); pool.cond.signal();
return @ptrCast(closure);
}
fn asyncParallel(
userdata: ?*anyopaque,
result_len: usize,
result_alignment: std.mem.Alignment,
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*Io.AnyFuture {
if (builtin.single_threaded) return null;
const pool: *Pool = @alignCast(@ptrCast(userdata));
const cpu_count = pool.cpu_count catch 1;
const gpa = pool.allocator;
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len);
const n = result_offset + result_len;
const closure: *AsyncClosure = @alignCast(@ptrCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch return null));
closure.* = .{
.func = start,
.context_offset = context_offset,
.result_offset = result_offset,
.reset_event = .{},
.cancel_tid = 0,
.select_condition = null,
.runnable = .{
.start = AsyncClosure.start,
.is_parallel = true,
},
};
@memcpy(closure.contextPointer()[0..context.len], context);
pool.mutex.lock();
pool.parallel_count += 1;
const thread_capacity = cpu_count - 1 + pool.parallel_count;
pool.threads.ensureTotalCapacity(gpa, thread_capacity) catch {
pool.mutex.unlock();
closure.free(gpa, result_len);
return null;
};
pool.run_queue.prepend(&closure.runnable.node);
if (pool.threads.items.len < thread_capacity) {
const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch {
assert(pool.run_queue.popFirst() == &closure.runnable.node);
pool.mutex.unlock();
closure.free(gpa, result_len);
return null;
};
pool.threads.appendAssumeCapacity(thread);
}
pool.mutex.unlock();
pool.cond.signal();
return @ptrCast(closure); return @ptrCast(closure);
} }
const DetachedClosure = struct { const DetachedClosure = struct {
pool: *Pool, pool: *Pool,
func: *const fn (context: *anyopaque) void, func: *const fn (context: *anyopaque) void,
runnable: Runnable = .{ .runFn = runFn }, runnable: Runnable,
context_alignment: std.mem.Alignment, context_alignment: std.mem.Alignment,
context_len: usize, context_len: usize,
fn runFn(runnable: *Pool.Runnable, _: ?usize) void { fn start(runnable: *Runnable) void {
const closure: *DetachedClosure = @alignCast(@fieldParentPtr("runnable", runnable)); const closure: *DetachedClosure = @alignCast(@fieldParentPtr("runnable", runnable));
closure.func(closure.contextPointer()); closure.func(closure.contextPointer());
const gpa = closure.pool.allocator; const gpa = closure.pool.allocator;
free(closure, gpa);
}
fn free(closure: *DetachedClosure, gpa: Allocator) void {
const base: [*]align(@alignOf(DetachedClosure)) u8 = @ptrCast(closure); const base: [*]align(@alignOf(DetachedClosure)) u8 = @ptrCast(closure);
gpa.free(base[0..contextEnd(closure.context_alignment, closure.context_len)]); gpa.free(base[0..contextEnd(closure.context_alignment, closure.context_len)]);
} }
@ -526,33 +383,46 @@ fn asyncDetached(
context_alignment: std.mem.Alignment, context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque) void, start: *const fn (context: *const anyopaque) void,
) void { ) void {
if (builtin.single_threaded) return start(context.ptr);
const pool: *Pool = @alignCast(@ptrCast(userdata)); const pool: *Pool = @alignCast(@ptrCast(userdata));
pool.mutex.lock(); const cpu_count = pool.cpu_count catch 1;
const gpa = pool.allocator; const gpa = pool.allocator;
const n = DetachedClosure.contextEnd(context_alignment, context.len); const n = DetachedClosure.contextEnd(context_alignment, context.len);
const closure: *DetachedClosure = @alignCast(@ptrCast(gpa.alignedAlloc(u8, .of(DetachedClosure), n) catch { const closure: *DetachedClosure = @alignCast(@ptrCast(gpa.alignedAlloc(u8, .of(DetachedClosure), n) catch {
pool.mutex.unlock(); return start(context.ptr);
start(context.ptr);
return;
})); }));
closure.* = .{ closure.* = .{
.pool = pool, .pool = pool,
.func = start, .func = start,
.context_alignment = context_alignment, .context_alignment = context_alignment,
.context_len = context.len, .context_len = context.len,
.runnable = .{
.start = DetachedClosure.start,
.is_parallel = false,
},
}; };
@memcpy(closure.contextPointer()[0..context.len], context); @memcpy(closure.contextPointer()[0..context.len], context);
pool.mutex.lock();
const thread_capacity = cpu_count - 1 + pool.parallel_count;
pool.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
pool.mutex.unlock();
closure.free(gpa);
return start(context.ptr);
};
pool.run_queue.prepend(&closure.runnable.node); pool.run_queue.prepend(&closure.runnable.node);
if (pool.threads.items.len < pool.threads.capacity) { if (pool.threads.items.len < thread_capacity) {
pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{ const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch {
.stack_size = pool.stack_size, assert(pool.run_queue.popFirst() == &closure.runnable.node);
.allocator = gpa, pool.mutex.unlock();
}, worker, .{pool}) catch t: { closure.free(gpa);
pool.threads.items.len -= 1; return start(context.ptr);
break :t undefined;
}; };
pool.threads.appendAssumeCapacity(thread);
} }
pool.mutex.unlock(); pool.mutex.unlock();

View file

@ -384,6 +384,8 @@ pub const CpuCountError = error{
}; };
/// Returns the platforms view on the number of logical CPU cores available. /// Returns the platforms view on the number of logical CPU cores available.
///
/// Returned value guaranteed to be >= 1.
pub fn getCpuCount() CpuCountError!usize { pub fn getCpuCount() CpuCountError!usize {
return try Impl.getCpuCount(); return try Impl.getCpuCount();
} }