diff --git a/CMakeLists.txt b/CMakeLists.txt index 690e2e35b1..18ca0dcedd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -413,7 +413,6 @@ set(ZIG_STAGE2_SOURCES lib/std/Thread/Futex.zig lib/std/Thread/Mutex.zig lib/std/Thread/Pool.zig - lib/std/Thread/ResetEvent.zig lib/std/Thread/WaitGroup.zig lib/std/array_hash_map.zig lib/std/array_list.zig diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig index 060ba085b6..e3e3db4ec4 100644 --- a/lib/std/Progress.zig +++ b/lib/std/Progress.zig @@ -392,7 +392,7 @@ var global_progress: Progress = .{ .terminal = undefined, .terminal_mode = .off, .update_thread = null, - .redraw_event = .{}, + .redraw_event = .unset, .refresh_rate_ns = undefined, .initial_delay_ns = undefined, .rows = 0, diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 6fddeaba5b..4b9851d8ea 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -10,9 +10,9 @@ const target = builtin.target; const native_os = builtin.os.tag; const posix = std.posix; const windows = std.os.windows; +const testing = std.testing; pub const Futex = @import("Thread/Futex.zig"); -pub const ResetEvent = @import("Thread/ResetEvent.zig"); pub const Mutex = @import("Thread/Mutex.zig"); pub const Semaphore = @import("Thread/Semaphore.zig"); pub const Condition = @import("Thread/Condition.zig"); @@ -22,6 +22,126 @@ pub const WaitGroup = @import("Thread/WaitGroup.zig"); pub const use_pthreads = native_os != .windows and native_os != .wasi and builtin.link_libc; +/// A thread-safe logical boolean value which can be `set` and `unset`. +/// +/// It can also block threads until the value is set with cancelation via timed +/// waits. Statically initializable; four bytes on all targets. +pub const ResetEvent = enum(u32) { + unset = 0, + waiting = 1, + is_set = 2, + + /// Returns whether the logical boolean is `set`. + /// + /// Once `reset` is called, this returns false until the next `set`. + /// + /// The memory accesses before the `set` can be said to happen before + /// `isSet` returns true. + pub fn isSet(re: *const ResetEvent) bool { + if (builtin.single_threaded) return switch (re.*) { + .unset => false, + .waiting => unreachable, + .is_set => true, + }; + // Acquire barrier ensures memory accesses before `set` happen before + // returning true. + return @atomicLoad(ResetEvent, re, .acquire) == .is_set; + } + + /// Blocks the calling thread until `set` is called. + /// + /// This is effectively a more efficient version of `while (!isSet()) {}`. + /// + /// The memory accesses before the `set` can be said to happen before `wait` returns. + pub fn wait(re: *ResetEvent) void { + if (builtin.single_threaded) switch (re.*) { + .unset => unreachable, // Deadlock, no other threads to wake us up. + .waiting => unreachable, // Invalid state. + .is_set => return, + }; + if (!re.isSet()) return timedWaitInner(re, null) catch |err| switch (err) { + error.Timeout => unreachable, // No timeout specified. + }; + } + + /// Blocks the calling thread until `set` is called, or until the + /// corresponding timeout expires, returning `error.Timeout`. + /// + /// This is effectively a more efficient version of `while (!isSet()) {}`. + /// + /// The memory accesses before the set() can be said to happen before + /// timedWait() returns without error. + pub fn timedWait(re: *ResetEvent, timeout_ns: u64) void { + if (builtin.single_threaded) switch (re.*) { + .unset => { + sleep(timeout_ns); + return error.Timeout; + }, + .waiting => unreachable, // Invalid state. + .is_set => return, + }; + if (!re.isSet()) return timedWaitInner(re, timeout_ns); + } + + fn timedWaitInner(re: *ResetEvent, timeout: ?u64) error{Timeout}!void { + @branchHint(.cold); + + // Try to set the state from `unset` to `waiting` to indicate to the + // `set` thread that others are blocked on the ResetEvent. Avoid using + // any strict barriers until we know the ResetEvent is set. + var state = @atomicLoad(ResetEvent, re, .acquire); + if (state == .unset) { + state = @cmpxchgStrong(ResetEvent, re, state, .waiting, .acquire, .acquire) orelse .waiting; + } + + // Wait until the ResetEvent is set since the state is waiting. + if (state == .waiting) { + var futex_deadline = Futex.Deadline.init(timeout); + while (true) { + const wait_result = futex_deadline.wait(@ptrCast(re), @intFromEnum(ResetEvent.waiting)); + + // Check if the ResetEvent was set before possibly reporting error.Timeout below. + state = @atomicLoad(ResetEvent, re, .acquire); + if (state != .waiting) break; + + try wait_result; + } + } + + assert(state == .is_set); + } + + /// Marks the logical boolean as `set` and unblocks any threads in `wait` + /// or `timedWait` to observe the new state. + /// + /// The logical boolean stays `set` until `reset` is called, making future + /// `set` calls do nothing semantically. + /// + /// The memory accesses before `set` can be said to happen before `isSet` + /// returns true or `wait`/`timedWait` return successfully. + pub fn set(re: *ResetEvent) void { + if (builtin.single_threaded) { + re.* = .is_set; + return; + } + if (@atomicRmw(ResetEvent, re, .Xchg, .is_set, .release) == .waiting) { + Futex.wake(@ptrCast(re), std.math.maxInt(u32)); + } + } + + /// Unmarks the ResetEvent as if `set` was never called. + /// + /// Assumes no threads are blocked in `wait` or `timedWait`. Concurrent + /// calls to `set`, `isSet` and `reset` are allowed. + pub fn reset(re: *ResetEvent) void { + if (builtin.single_threaded) { + re.* = .unset; + return; + } + @atomicStore(ResetEvent, re, .unset, .monotonic); + } +}; + /// Spurious wakeups are possible and no precision of timing is guaranteed. pub fn sleep(nanoseconds: u64) void { if (builtin.os.tag == .windows) { @@ -1681,3 +1801,125 @@ fn testTls() !void { x += 1; if (x != 1235) return error.TlsBadEndValue; } + +test "ResetEvent smoke test" { + // make sure the event is unset + var event = ResetEvent{}; + try testing.expectEqual(false, event.isSet()); + + // make sure the event gets set + event.set(); + try testing.expectEqual(true, event.isSet()); + + // make sure the event gets unset again + event.reset(); + try testing.expectEqual(false, event.isSet()); + + // waits should timeout as there's no other thread to set the event + try testing.expectError(error.Timeout, event.timedWait(0)); + try testing.expectError(error.Timeout, event.timedWait(std.time.ns_per_ms)); + + // set the event again and make sure waits complete + event.set(); + event.wait(); + try event.timedWait(std.time.ns_per_ms); + try testing.expectEqual(true, event.isSet()); +} + +test "ResetEvent signaling" { + // This test requires spawning threads + if (builtin.single_threaded) { + return error.SkipZigTest; + } + + const Context = struct { + in: ResetEvent = .{}, + out: ResetEvent = .{}, + value: usize = 0, + + fn input(self: *@This()) !void { + // wait for the value to become 1 + self.in.wait(); + self.in.reset(); + try testing.expectEqual(self.value, 1); + + // bump the value and wake up output() + self.value = 2; + self.out.set(); + + // wait for output to receive 2, bump the value and wake us up with 3 + self.in.wait(); + self.in.reset(); + try testing.expectEqual(self.value, 3); + + // bump the value and wake up output() for it to see 4 + self.value = 4; + self.out.set(); + } + + fn output(self: *@This()) !void { + // start with 0 and bump the value for input to see 1 + try testing.expectEqual(self.value, 0); + self.value = 1; + self.in.set(); + + // wait for input to receive 1, bump the value to 2 and wake us up + self.out.wait(); + self.out.reset(); + try testing.expectEqual(self.value, 2); + + // bump the value to 3 for input to see (rhymes) + self.value = 3; + self.in.set(); + + // wait for input to bump the value to 4 and receive no more (rhymes) + self.out.wait(); + self.out.reset(); + try testing.expectEqual(self.value, 4); + } + }; + + var ctx = Context{}; + + const thread = try std.Thread.spawn(.{}, Context.output, .{&ctx}); + defer thread.join(); + + try ctx.input(); +} + +test "ResetEvent broadcast" { + // This test requires spawning threads + if (builtin.single_threaded) { + return error.SkipZigTest; + } + + const num_threads = 10; + const Barrier = struct { + event: ResetEvent = .{}, + counter: std.atomic.Value(usize) = std.atomic.Value(usize).init(num_threads), + + fn wait(self: *@This()) void { + if (self.counter.fetchSub(1, .acq_rel) == 1) { + self.event.set(); + } + } + }; + + const Context = struct { + start_barrier: Barrier = .{}, + finish_barrier: Barrier = .{}, + + fn run(self: *@This()) void { + self.start_barrier.wait(); + self.finish_barrier.wait(); + } + }; + + var ctx = Context{}; + var threads: [num_threads - 1]std.Thread = undefined; + + for (&threads) |*t| t.* = try std.Thread.spawn(.{}, Context.run, .{&ctx}); + defer for (threads) |t| t.join(); + + ctx.run(); +} diff --git a/lib/std/Thread/ResetEvent.zig b/lib/std/Thread/ResetEvent.zig deleted file mode 100644 index 2f22d8456a..0000000000 --- a/lib/std/Thread/ResetEvent.zig +++ /dev/null @@ -1,278 +0,0 @@ -//! ResetEvent is a thread-safe bool which can be set to true/false ("set"/"unset"). -//! It can also block threads until the "bool" is set with cancellation via timed waits. -//! ResetEvent can be statically initialized and is at most `@sizeOf(u64)` large. - -const std = @import("../std.zig"); -const builtin = @import("builtin"); -const ResetEvent = @This(); - -const os = std.os; -const assert = std.debug.assert; -const testing = std.testing; -const Futex = std.Thread.Futex; - -impl: Impl = .{}, - -/// Returns if the ResetEvent was set(). -/// Once reset() is called, this returns false until the next set(). -/// The memory accesses before the set() can be said to happen before isSet() returns true. -pub fn isSet(self: *const ResetEvent) bool { - return self.impl.isSet(); -} - -/// Block's the callers thread until the ResetEvent is set(). -/// This is effectively a more efficient version of `while (!isSet()) {}`. -/// The memory accesses before the set() can be said to happen before wait() returns. -pub fn wait(self: *ResetEvent) void { - self.impl.wait(null) catch |err| switch (err) { - error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out - }; -} - -/// Block's the callers thread until the ResetEvent is set(), or until the corresponding timeout expires. -/// If the timeout expires before the ResetEvent is set, `error.Timeout` is returned. -/// This is effectively a more efficient version of `while (!isSet()) {}`. -/// The memory accesses before the set() can be said to happen before timedWait() returns without error. -pub fn timedWait(self: *ResetEvent, timeout_ns: u64) error{Timeout}!void { - return self.impl.wait(timeout_ns); -} - -/// Marks the ResetEvent as "set" and unblocks any threads in `wait()` or `timedWait()` to observe the new state. -/// The ResetEvent says "set" until reset() is called, making future set() calls do nothing semantically. -/// The memory accesses before set() can be said to happen before isSet() returns true or wait()/timedWait() return successfully. -pub fn set(self: *ResetEvent) void { - self.impl.set(); -} - -/// Unmarks the ResetEvent from its "set" state if set() was called previously. -/// It is undefined behavior is reset() is called while threads are blocked in wait() or timedWait(). -/// Concurrent calls to set(), isSet() and reset() are allowed. -pub fn reset(self: *ResetEvent) void { - self.impl.reset(); -} - -const Impl = if (builtin.single_threaded) - SingleThreadedImpl -else - FutexImpl; - -const SingleThreadedImpl = struct { - is_set: bool = false, - - fn isSet(self: *const Impl) bool { - return self.is_set; - } - - fn wait(self: *Impl, timeout: ?u64) error{Timeout}!void { - if (self.isSet()) { - return; - } - - // There are no other threads to wake us up. - // So if we wait without a timeout we would never wake up. - const timeout_ns = timeout orelse { - unreachable; // deadlock detected - }; - - std.Thread.sleep(timeout_ns); - return error.Timeout; - } - - fn set(self: *Impl) void { - self.is_set = true; - } - - fn reset(self: *Impl) void { - self.is_set = false; - } -}; - -const FutexImpl = struct { - state: std.atomic.Value(u32) = std.atomic.Value(u32).init(unset), - - const unset = 0; - const waiting = 1; - const is_set = 2; - - fn isSet(self: *const Impl) bool { - // Acquire barrier ensures memory accesses before set() happen before we return true. - return self.state.load(.acquire) == is_set; - } - - fn wait(self: *Impl, timeout: ?u64) error{Timeout}!void { - // Outline the slow path to allow isSet() to be inlined - if (!self.isSet()) { - return self.waitUntilSet(timeout); - } - } - - fn waitUntilSet(self: *Impl, timeout: ?u64) error{Timeout}!void { - @branchHint(.cold); - - // Try to set the state from `unset` to `waiting` to indicate - // to the set() thread that others are blocked on the ResetEvent. - // We avoid using any strict barriers until the end when we know the ResetEvent is set. - var state = self.state.load(.acquire); - if (state == unset) { - state = self.state.cmpxchgStrong(state, waiting, .acquire, .acquire) orelse waiting; - } - - // Wait until the ResetEvent is set since the state is waiting. - if (state == waiting) { - var futex_deadline = Futex.Deadline.init(timeout); - while (true) { - const wait_result = futex_deadline.wait(&self.state, waiting); - - // Check if the ResetEvent was set before possibly reporting error.Timeout below. - state = self.state.load(.acquire); - if (state != waiting) { - break; - } - - try wait_result; - } - } - - assert(state == is_set); - } - - fn set(self: *Impl) void { - // Quick check if the ResetEvent is already set before doing the atomic swap below. - // set() could be getting called quite often and multiple threads calling swap() increases contention unnecessarily. - if (self.state.load(.monotonic) == is_set) { - return; - } - - // Mark the ResetEvent as set and unblock all waiters waiting on it if any. - // Release barrier ensures memory accesses before set() happen before the ResetEvent is observed to be "set". - if (self.state.swap(is_set, .release) == waiting) { - Futex.wake(&self.state, std.math.maxInt(u32)); - } - } - - fn reset(self: *Impl) void { - self.state.store(unset, .monotonic); - } -}; - -test "smoke test" { - // make sure the event is unset - var event = ResetEvent{}; - try testing.expectEqual(false, event.isSet()); - - // make sure the event gets set - event.set(); - try testing.expectEqual(true, event.isSet()); - - // make sure the event gets unset again - event.reset(); - try testing.expectEqual(false, event.isSet()); - - // waits should timeout as there's no other thread to set the event - try testing.expectError(error.Timeout, event.timedWait(0)); - try testing.expectError(error.Timeout, event.timedWait(std.time.ns_per_ms)); - - // set the event again and make sure waits complete - event.set(); - event.wait(); - try event.timedWait(std.time.ns_per_ms); - try testing.expectEqual(true, event.isSet()); -} - -test "signaling" { - // This test requires spawning threads - if (builtin.single_threaded) { - return error.SkipZigTest; - } - - const Context = struct { - in: ResetEvent = .{}, - out: ResetEvent = .{}, - value: usize = 0, - - fn input(self: *@This()) !void { - // wait for the value to become 1 - self.in.wait(); - self.in.reset(); - try testing.expectEqual(self.value, 1); - - // bump the value and wake up output() - self.value = 2; - self.out.set(); - - // wait for output to receive 2, bump the value and wake us up with 3 - self.in.wait(); - self.in.reset(); - try testing.expectEqual(self.value, 3); - - // bump the value and wake up output() for it to see 4 - self.value = 4; - self.out.set(); - } - - fn output(self: *@This()) !void { - // start with 0 and bump the value for input to see 1 - try testing.expectEqual(self.value, 0); - self.value = 1; - self.in.set(); - - // wait for input to receive 1, bump the value to 2 and wake us up - self.out.wait(); - self.out.reset(); - try testing.expectEqual(self.value, 2); - - // bump the value to 3 for input to see (rhymes) - self.value = 3; - self.in.set(); - - // wait for input to bump the value to 4 and receive no more (rhymes) - self.out.wait(); - self.out.reset(); - try testing.expectEqual(self.value, 4); - } - }; - - var ctx = Context{}; - - const thread = try std.Thread.spawn(.{}, Context.output, .{&ctx}); - defer thread.join(); - - try ctx.input(); -} - -test "broadcast" { - // This test requires spawning threads - if (builtin.single_threaded) { - return error.SkipZigTest; - } - - const num_threads = 10; - const Barrier = struct { - event: ResetEvent = .{}, - counter: std.atomic.Value(usize) = std.atomic.Value(usize).init(num_threads), - - fn wait(self: *@This()) void { - if (self.counter.fetchSub(1, .acq_rel) == 1) { - self.event.set(); - } - } - }; - - const Context = struct { - start_barrier: Barrier = .{}, - finish_barrier: Barrier = .{}, - - fn run(self: *@This()) void { - self.start_barrier.wait(); - self.finish_barrier.wait(); - } - }; - - var ctx = Context{}; - var threads: [num_threads - 1]std.Thread = undefined; - - for (&threads) |*t| t.* = try std.Thread.spawn(.{}, Context.run, .{&ctx}); - defer for (threads) |t| t.join(); - - ctx.run(); -} diff --git a/lib/std/Thread/WaitGroup.zig b/lib/std/Thread/WaitGroup.zig index 52e9c379c2..a5970b7d69 100644 --- a/lib/std/Thread/WaitGroup.zig +++ b/lib/std/Thread/WaitGroup.zig @@ -7,11 +7,15 @@ const is_waiting: usize = 1 << 0; const one_pending: usize = 1 << 1; state: std.atomic.Value(usize) = std.atomic.Value(usize).init(0), -event: std.Thread.ResetEvent = .{}, +event: std.Thread.ResetEvent = .unset, pub fn start(self: *WaitGroup) void { - const state = self.state.fetchAdd(one_pending, .monotonic); - assert((state / one_pending) < (std.math.maxInt(usize) / one_pending)); + return startStateless(&self.state); +} + +pub fn startStateless(state: *std.atomic.Value(usize)) void { + const prev_state = state.fetchAdd(one_pending, .monotonic); + assert((prev_state / one_pending) < (std.math.maxInt(usize) / one_pending)); } pub fn startMany(self: *WaitGroup, n: usize) void { @@ -28,13 +32,20 @@ pub fn finish(self: *WaitGroup) void { } } -pub fn wait(self: *WaitGroup) void { - const state = self.state.fetchAdd(is_waiting, .acquire); - assert(state & is_waiting == 0); +pub fn finishStateless(state: *std.atomic.Value(usize), event: *std.Thread.ResetEvent) void { + const prev_state = state.fetchSub(one_pending, .acq_rel); + assert((prev_state / one_pending) > 0); + if (prev_state == (one_pending | is_waiting)) event.set(); +} - if ((state / one_pending) > 0) { - self.event.wait(); - } +pub fn wait(wg: *WaitGroup) void { + return waitStateless(&wg.state, &wg.event); +} + +pub fn waitStateless(state: *std.atomic.Value(usize), event: *std.Thread.ResetEvent) void { + const prev_state = state.fetchAdd(is_waiting, .acquire); + assert(prev_state & is_waiting == 0); + if ((prev_state / one_pending) > 0) event.wait(); } pub fn reset(self: *WaitGroup) void {