introduce std.Io interface

which is planned to have all I/O operations in the interface, but for
now has only async and await.
This commit is contained in:
Andrew Kelley 2025-03-24 18:43:53 -07:00
parent f657767b60
commit 5b2f54fc80
2 changed files with 72 additions and 20 deletions

View file

@ -911,3 +911,72 @@ test {
_ = @import("Io/stream_source.zig");
_ = @import("Io/test.zig");
}
const Io = @This();
userdata: ?*anyopaque,
vtable: *const VTable,
pub const VTable = struct {
/// If it returns `null` it means `result` has been already populated and
/// `await` will be a no-op.
async: *const fn (
/// Corresponds to `Io.userdata`.
userdata: ?*anyopaque,
/// The pointer of this slice is an "eager" result value.
/// The length is the size in bytes of the result type.
eager_result: []u8,
/// Passed to `start`.
context: ?*anyopaque,
start: *const fn (context: ?*anyopaque, result: *anyopaque) void,
) ?*AnyFuture,
/// This function is only called when `async` returns a non-null value.
await: *const fn (
/// Corresponds to `Io.userdata`.
userdata: ?*anyopaque,
/// The same value that was returned from `async`.
any_future: *AnyFuture,
/// Points to a buffer where the result is written.
/// The length is equal to size in bytes of result type.
result: []u8,
) void,
};
pub const AnyFuture = opaque {};
pub fn Future(Result: type) type {
return struct {
any_future: ?*AnyFuture,
result: Result,
pub fn await(f: *@This(), io: Io) Result {
const any_future = f.any_future orelse return f.result;
io.vtable.await(io.userdata, any_future, @ptrCast((&f.result)[0..1]));
f.any_future = null;
return f.result;
}
};
}
/// `s` is a struct instance that contains a function like this:
/// ```
/// struct {
/// pub fn start(s: S) Result { ... }
/// }
/// ```
/// where `Result` is any type.
pub fn async(io: Io, s: anytype) Future(@typeInfo(@TypeOf(@TypeOf(s).start)).@"fn".return_type.?) {
const S = @TypeOf(s);
const Result = @typeInfo(@TypeOf(S.start)).@"fn".return_type.?;
const TypeErased = struct {
fn start(context: ?*anyopaque, result: *anyopaque) void {
const context_casted: *const S = @alignCast(@ptrCast(context));
const result_casted: *Result = @ptrCast(@alignCast(result));
result_casted.* = S.start(context_casted.*);
}
};
var future: Future(Result) = undefined;
future.any_future = io.vtable.async(io.userdata, @ptrCast((&future.result)[0..1]), @constCast(&s), TypeErased.start);
return future;
}

View file

@ -7,6 +7,7 @@ mutex: std.Thread.Mutex = .{},
cond: std.Thread.Condition = .{},
run_queue: std.SinglyLinkedList = .{},
is_running: bool = true,
/// Must be a thread-safe allocator.
allocator: std.mem.Allocator,
threads: if (builtin.single_threaded) [0]std.Thread else []std.Thread,
ids: if (builtin.single_threaded) struct {
@ -16,12 +17,12 @@ ids: if (builtin.single_threaded) struct {
}
} else std.AutoArrayHashMapUnmanaged(std.Thread.Id, void),
const Runnable = struct {
pub const Runnable = struct {
runFn: RunProto,
node: std.SinglyLinkedList.Node = .{},
};
const RunProto = *const fn (*Runnable, id: ?usize) void;
pub const RunProto = *const fn (*Runnable, id: ?usize) void;
pub const Options = struct {
allocator: std.mem.Allocator,
@ -117,12 +118,6 @@ pub fn spawnWg(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, args
const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable));
@call(.auto, func, closure.arguments);
closure.wait_group.finish();
// The thread pool's allocator is protected by the mutex.
const mutex = &closure.pool.mutex;
mutex.lock();
defer mutex.unlock();
closure.pool.allocator.destroy(closure);
}
};
@ -179,12 +174,6 @@ pub fn spawnWgId(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, ar
const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable));
@call(.auto, func, .{id.?} ++ closure.arguments);
closure.wait_group.finish();
// The thread pool's allocator is protected by the mutex.
const mutex = &closure.pool.mutex;
mutex.lock();
defer mutex.unlock();
closure.pool.allocator.destroy(closure);
}
};
@ -228,12 +217,6 @@ pub fn spawn(pool: *Pool, comptime func: anytype, args: anytype) !void {
fn runFn(runnable: *Runnable, _: ?usize) void {
const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable));
@call(.auto, func, closure.arguments);
// The thread pool's allocator is protected by the mutex.
const mutex = &closure.pool.mutex;
mutex.lock();
defer mutex.unlock();
closure.pool.allocator.destroy(closure);
}
};