mirror of
https://codeberg.org/ziglang/zig.git
synced 2025-12-06 13:54:21 +00:00
std.Io.Condition: change primitive to support only one
and no timer
This commit is contained in:
parent
63b3a3d11c
commit
e5b2df0c9b
3 changed files with 37 additions and 80 deletions
|
|
@ -629,8 +629,8 @@ pub const VTable = struct {
|
||||||
mutexLock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) Cancelable!void,
|
mutexLock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) Cancelable!void,
|
||||||
mutexUnlock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void,
|
mutexUnlock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void,
|
||||||
|
|
||||||
conditionWait: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex, timeout_ns: ?u64) Condition.WaitError!void,
|
conditionWait: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex) Cancelable!void,
|
||||||
conditionWake: *const fn (?*anyopaque, cond: *Condition, notify: Condition.Notify) void,
|
conditionWake: *const fn (?*anyopaque, cond: *Condition) void,
|
||||||
|
|
||||||
createFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.CreateFlags) FileOpenError!fs.File,
|
createFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.CreateFlags) FileOpenError!fs.File,
|
||||||
openFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.OpenFlags) FileOpenError!fs.File,
|
openFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.OpenFlags) FileOpenError!fs.File,
|
||||||
|
|
@ -642,6 +642,11 @@ pub const VTable = struct {
|
||||||
sleep: *const fn (?*anyopaque, clockid: std.posix.clockid_t, deadline: Deadline) SleepError!void,
|
sleep: *const fn (?*anyopaque, clockid: std.posix.clockid_t, deadline: Deadline) SleepError!void,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pub const Cancelable = error{
|
||||||
|
/// Caller has requested the async operation to stop.
|
||||||
|
Canceled,
|
||||||
|
};
|
||||||
|
|
||||||
pub const OpenFlags = fs.File.OpenFlags;
|
pub const OpenFlags = fs.File.OpenFlags;
|
||||||
pub const CreateFlags = fs.File.CreateFlags;
|
pub const CreateFlags = fs.File.CreateFlags;
|
||||||
|
|
||||||
|
|
@ -795,43 +800,18 @@ pub const Mutex = if (true) struct {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Supports exactly 1 waiter. More than 1 simultaneous wait on the same
|
||||||
|
/// condition is illegal.
|
||||||
pub const Condition = struct {
|
pub const Condition = struct {
|
||||||
state: u64 = 0,
|
state: u64 = 0,
|
||||||
|
|
||||||
pub const WaitError = error{
|
|
||||||
Timeout,
|
|
||||||
Canceled,
|
|
||||||
};
|
|
||||||
|
|
||||||
/// How many waiters to wake up.
|
|
||||||
pub const Notify = enum {
|
|
||||||
one,
|
|
||||||
all,
|
|
||||||
};
|
|
||||||
|
|
||||||
pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void {
|
pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void {
|
||||||
io.vtable.conditionWait(io.userdata, cond, mutex, null) catch |err| switch (err) {
|
return io.vtable.conditionWait(io.userdata, cond, mutex);
|
||||||
error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out
|
|
||||||
error.Canceled => return error.Canceled,
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn timedWait(cond: *Condition, io: Io, mutex: *Mutex, timeout_ns: u64) WaitError!void {
|
pub fn wake(cond: *Condition, io: Io) void {
|
||||||
return io.vtable.conditionWait(io.userdata, cond, mutex, timeout_ns);
|
io.vtable.conditionWake(io.userdata, cond);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn signal(cond: *Condition, io: Io) void {
|
|
||||||
io.vtable.conditionWake(io.userdata, cond, .one);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn broadcast(cond: *Condition, io: Io) void {
|
|
||||||
io.vtable.conditionWake(io.userdata, cond, .all);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
pub const Cancelable = error{
|
|
||||||
/// Caller has requested the async operation to stop.
|
|
||||||
Canceled,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
pub const TypeErasedQueue = struct {
|
pub const TypeErasedQueue = struct {
|
||||||
|
|
@ -883,7 +863,7 @@ pub const TypeErasedQueue = struct {
|
||||||
remaining = remaining[copy_len..];
|
remaining = remaining[copy_len..];
|
||||||
getter.data.remaining = getter.data.remaining[copy_len..];
|
getter.data.remaining = getter.data.remaining[copy_len..];
|
||||||
if (getter.data.remaining.len == 0) {
|
if (getter.data.remaining.len == 0) {
|
||||||
getter.data.condition.signal(io);
|
getter.data.condition.wake(io);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
q.getters.prepend(getter);
|
q.getters.prepend(getter);
|
||||||
|
|
@ -966,7 +946,7 @@ pub const TypeErasedQueue = struct {
|
||||||
putter.data.remaining = putter.data.remaining[copy_len..];
|
putter.data.remaining = putter.data.remaining[copy_len..];
|
||||||
remaining = remaining[copy_len..];
|
remaining = remaining[copy_len..];
|
||||||
if (putter.data.remaining.len == 0) {
|
if (putter.data.remaining.len == 0) {
|
||||||
putter.data.condition.signal(io);
|
putter.data.condition.wake(io);
|
||||||
} else {
|
} else {
|
||||||
assert(remaining.len == 0);
|
assert(remaining.len == 0);
|
||||||
q.putters.prepend(putter);
|
q.putters.prepend(putter);
|
||||||
|
|
@ -999,7 +979,7 @@ pub const TypeErasedQueue = struct {
|
||||||
putter.data.remaining = putter.data.remaining[copy_len..];
|
putter.data.remaining = putter.data.remaining[copy_len..];
|
||||||
q.put_index += copy_len;
|
q.put_index += copy_len;
|
||||||
if (putter.data.remaining.len == 0) {
|
if (putter.data.remaining.len == 0) {
|
||||||
putter.data.condition.signal(io);
|
putter.data.condition.wake(io);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
const second_available = q.buffer[0..q.get_index];
|
const second_available = q.buffer[0..q.get_index];
|
||||||
|
|
@ -1008,7 +988,7 @@ pub const TypeErasedQueue = struct {
|
||||||
putter.data.remaining = putter.data.remaining[copy_len..];
|
putter.data.remaining = putter.data.remaining[copy_len..];
|
||||||
q.put_index = copy_len;
|
q.put_index = copy_len;
|
||||||
if (putter.data.remaining.len == 0) {
|
if (putter.data.remaining.len == 0) {
|
||||||
putter.data.condition.signal(io);
|
putter.data.condition.wake(io);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
q.putters.prepend(putter);
|
q.putters.prepend(putter);
|
||||||
|
|
|
||||||
|
|
@ -781,7 +781,6 @@ fn go(
|
||||||
event_loop.schedule(current_thread, .{ .head = fiber, .tail = fiber });
|
event_loop.schedule(current_thread, .{ .head = fiber, .tail = fiber });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
fn @"await"(
|
fn @"await"(
|
||||||
userdata: ?*anyopaque,
|
userdata: ?*anyopaque,
|
||||||
any_future: *std.Io.AnyFuture,
|
any_future: *std.Io.AnyFuture,
|
||||||
|
|
@ -1277,24 +1276,24 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut
|
||||||
el.yield(maybe_waiting_fiber.?, .reschedule);
|
el.yield(maybe_waiting_fiber.?, .reschedule);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn conditionWait(
|
fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
|
||||||
userdata: ?*anyopaque,
|
const el: *EventLoop = @alignCast(@ptrCast(userdata));
|
||||||
cond: *Io.Condition,
|
const cond_state: *?*Fiber = @ptrCast(&cond.state);
|
||||||
mutex: *Io.Mutex,
|
const thread: *Thread = .current();
|
||||||
timeout: ?u64,
|
const fiber = thread.currentFiber();
|
||||||
) Io.Condition.WaitError!void {
|
const prev = @atomicRmw(?*Fiber, cond_state, .Xchg, fiber, .acquire);
|
||||||
_ = userdata;
|
assert(prev == null); // More than one wait on same Condition is illegal.
|
||||||
_ = cond;
|
mutex.unlock(io(el));
|
||||||
_ = mutex;
|
el.yield(null, .nothing);
|
||||||
_ = timeout;
|
try mutex.lock(io(el));
|
||||||
@panic("TODO");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, notify: Io.Condition.Notify) void {
|
fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition) void {
|
||||||
_ = userdata;
|
const el: *EventLoop = @alignCast(@ptrCast(userdata));
|
||||||
_ = cond;
|
const cond_state: *?*Fiber = @ptrCast(&cond.state);
|
||||||
_ = notify;
|
if (@atomicRmw(?*Fiber, cond_state, .Xchg, null, .acquire)) |fiber| {
|
||||||
@panic("TODO");
|
el.yield(fiber, .reschedule);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn errno(signed: i32) std.os.linux.E {
|
fn errno(signed: i32) std.os.linux.E {
|
||||||
|
|
|
||||||
|
|
@ -619,12 +619,7 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn conditionWait(
|
fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
|
||||||
userdata: ?*anyopaque,
|
|
||||||
cond: *Io.Condition,
|
|
||||||
mutex: *Io.Mutex,
|
|
||||||
timeout: ?u64,
|
|
||||||
) Io.Condition.WaitError!void {
|
|
||||||
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
|
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
|
||||||
comptime assert(@TypeOf(cond.state) == u64);
|
comptime assert(@TypeOf(cond.state) == u64);
|
||||||
const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
|
const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
|
||||||
|
|
@ -652,25 +647,11 @@ fn conditionWait(
|
||||||
mutex.unlock(pool.io());
|
mutex.unlock(pool.io());
|
||||||
defer mutex.lock(pool.io()) catch @panic("TODO");
|
defer mutex.lock(pool.io()) catch @panic("TODO");
|
||||||
|
|
||||||
var futex_deadline = std.Thread.Futex.Deadline.init(timeout);
|
var futex_deadline = std.Thread.Futex.Deadline.init(null);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
futex_deadline.wait(cond_epoch, epoch) catch |err| switch (err) {
|
futex_deadline.wait(cond_epoch, epoch) catch |err| switch (err) {
|
||||||
// On timeout, we must decrement the waiter we added above.
|
error.Timeout => unreachable,
|
||||||
error.Timeout => {
|
|
||||||
while (true) {
|
|
||||||
// If there's a signal when we're timing out, consume it and report being woken up instead.
|
|
||||||
// Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return.
|
|
||||||
while (state & signal_mask != 0) {
|
|
||||||
const new_state = state - one_waiter - one_signal;
|
|
||||||
state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove the waiter we added and officially return timed out.
|
|
||||||
const new_state = state - one_waiter;
|
|
||||||
state = cond_state.cmpxchgWeak(state, new_state, .monotonic, .monotonic) orelse return err;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
epoch = cond_epoch.load(.acquire);
|
epoch = cond_epoch.load(.acquire);
|
||||||
|
|
@ -685,7 +666,7 @@ fn conditionWait(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, notify: Io.Condition.Notify) void {
|
fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition) void {
|
||||||
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
|
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
|
||||||
_ = pool;
|
_ = pool;
|
||||||
comptime assert(@TypeOf(cond.state) == u64);
|
comptime assert(@TypeOf(cond.state) == u64);
|
||||||
|
|
@ -709,10 +690,7 @@ fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, notify: Io.Conditio
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const to_wake = switch (notify) {
|
const to_wake = 1;
|
||||||
.one => 1,
|
|
||||||
.all => wakeable,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Reserve the amount of waiters to wake by incrementing the signals count.
|
// Reserve the amount of waiters to wake by incrementing the signals count.
|
||||||
// Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads.
|
// Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads.
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue