std.Io: fix error handling and asyncParallel docs

This commit is contained in:
Andrew Kelley 2025-07-17 20:53:39 -07:00
parent 9fd1ecb348
commit 25b2954c0c
3 changed files with 20 additions and 20 deletions

View file

@ -593,7 +593,7 @@ pub const VTable = struct {
context: []const u8, context: []const u8,
context_alignment: std.mem.Alignment, context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void, start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*AnyFuture, ) error{OutOfMemory}!*AnyFuture,
/// Returning `null` indicates resource allocation failed. /// Returning `null` indicates resource allocation failed.
/// ///
/// Thread-safe. /// Thread-safe.
@ -606,7 +606,7 @@ pub const VTable = struct {
context: []const u8, context: []const u8,
context_alignment: std.mem.Alignment, context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void, start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*AnyFuture, ) error{OutOfMemory}!*AnyFuture,
/// Executes `start` asynchronously in a manner such that it cleans itself /// Executes `start` asynchronously in a manner such that it cleans itself
/// up. This mode does not support results, await, or cancel. /// up. This mode does not support results, await, or cancel.
/// ///
@ -1220,7 +1220,7 @@ pub fn asyncConcurrent(
} }
}; };
var future: Future(Result) = undefined; var future: Future(Result) = undefined;
future.any_future = io.vtable.asyncConcurrent( future.any_future = try io.vtable.asyncConcurrent(
io.userdata, io.userdata,
@sizeOf(Result), @sizeOf(Result),
.of(Result), .of(Result),
@ -1231,14 +1231,14 @@ pub fn asyncConcurrent(
return future; return future;
} }
/// Calls `function` with `args`, such that the return value of the function is /// Simultaneously calls `function` with `args` while passing control flow back
/// not guaranteed to be available until `await` is called, while simultaneously /// to the caller. The return value of the function is not guaranteed to be
/// passing control flow back to the caller. /// available until `await` is called.
/// ///
/// This has the strongest guarantees of all async family functions, placing /// This has the strongest guarantees of all async family functions, placing
/// the most restrictions on what kind of `Io` implementations are supported. /// the most restrictions on what kind of `Io` implementations are supported.
/// By calling `asyncConcurrent` instead, one allows, for example, /// By calling `asyncConcurrent` instead, one allows, for example, stackful
/// stackful single-threaded non-blocking I/O. /// single-threaded non-blocking I/O.
/// ///
/// See also: /// See also:
/// * `asyncConcurrent` /// * `asyncConcurrent`
@ -1258,9 +1258,9 @@ pub fn asyncParallel(
} }
}; };
var future: Future(Result) = undefined; var future: Future(Result) = undefined;
future.any_future = io.vtable.asyncConcurrent( future.any_future = try io.vtable.asyncParallel(
io.userdata, io.userdata,
@ptrCast((&future.result)[0..1]), @sizeOf(Result),
.of(Result), .of(Result),
@ptrCast((&args)[0..1]), @ptrCast((&args)[0..1]),
.of(Args), .of(Args),

View file

@ -879,7 +879,7 @@ fn async(
context_alignment: Alignment, context_alignment: Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void, start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*std.Io.AnyFuture { ) ?*std.Io.AnyFuture {
return asyncConcurrent(userdata, result.len, result_alignment, context, context_alignment, start) orelse { return asyncConcurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
start(context.ptr, result.ptr); start(context.ptr, result.ptr);
return null; return null;
}; };
@ -892,14 +892,14 @@ fn asyncConcurrent(
context: []const u8, context: []const u8,
context_alignment: Alignment, context_alignment: Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void, start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*std.Io.AnyFuture { ) error{OutOfMemory}!*std.Io.AnyFuture {
assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO
assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
assert(result_len <= Fiber.max_result_size); // TODO assert(result_len <= Fiber.max_result_size); // TODO
assert(context.len <= Fiber.max_context_size); // TODO assert(context.len <= Fiber.max_context_size); // TODO
const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
const fiber = Fiber.allocate(event_loop) catch return null; const fiber = try Fiber.allocate(event_loop);
std.log.debug("allocated {*}", .{fiber}); std.log.debug("allocated {*}", .{fiber});
const closure: *AsyncClosure = .fromFiber(fiber); const closure: *AsyncClosure = .fromFiber(fiber);
@ -945,7 +945,7 @@ fn asyncParallel(
context: []const u8, context: []const u8,
context_alignment: Alignment, context_alignment: Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void, start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*std.Io.AnyFuture { ) error{OutOfMemory}!*std.Io.AnyFuture {
_ = userdata; _ = userdata;
_ = result_len; _ = result_len;
_ = result_alignment; _ = result_alignment;

View file

@ -220,7 +220,7 @@ fn async(
} }
const pool: *Pool = @alignCast(@ptrCast(userdata)); const pool: *Pool = @alignCast(@ptrCast(userdata));
const cpu_count = pool.cpu_count catch { const cpu_count = pool.cpu_count catch {
return asyncParallel(userdata, result.len, result_alignment, context, context_alignment, start) orelse { return asyncParallel(userdata, result.len, result_alignment, context, context_alignment, start) catch {
start(context.ptr, result.ptr); start(context.ptr, result.ptr);
return null; return null;
}; };
@ -291,8 +291,8 @@ fn asyncParallel(
context: []const u8, context: []const u8,
context_alignment: std.mem.Alignment, context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void, start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*Io.AnyFuture { ) error{OutOfMemory}!*Io.AnyFuture {
if (builtin.single_threaded) return null; if (builtin.single_threaded) unreachable;
const pool: *Pool = @alignCast(@ptrCast(userdata)); const pool: *Pool = @alignCast(@ptrCast(userdata));
const cpu_count = pool.cpu_count catch 1; const cpu_count = pool.cpu_count catch 1;
@ -300,7 +300,7 @@ fn asyncParallel(
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len); const result_offset = result_alignment.forward(context_offset + context.len);
const n = result_offset + result_len; const n = result_offset + result_len;
const closure: *AsyncClosure = @alignCast(@ptrCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch return null)); const closure: *AsyncClosure = @alignCast(@ptrCast(try gpa.alignedAlloc(u8, .of(AsyncClosure), n)));
closure.* = .{ closure.* = .{
.func = start, .func = start,
@ -324,7 +324,7 @@ fn asyncParallel(
pool.threads.ensureTotalCapacity(gpa, thread_capacity) catch { pool.threads.ensureTotalCapacity(gpa, thread_capacity) catch {
pool.mutex.unlock(); pool.mutex.unlock();
closure.free(gpa, result_len); closure.free(gpa, result_len);
return null; return error.OutOfMemory;
}; };
pool.run_queue.prepend(&closure.runnable.node); pool.run_queue.prepend(&closure.runnable.node);
@ -334,7 +334,7 @@ fn asyncParallel(
assert(pool.run_queue.popFirst() == &closure.runnable.node); assert(pool.run_queue.popFirst() == &closure.runnable.node);
pool.mutex.unlock(); pool.mutex.unlock();
closure.free(gpa, result_len); closure.free(gpa, result_len);
return null; return error.OutOfMemory;
}; };
pool.threads.appendAssumeCapacity(thread); pool.threads.appendAssumeCapacity(thread);
} }