From c4f5dda135616207c24a6009e221724f5a6fa6aa Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sat, 29 Nov 2025 08:58:50 -0800 Subject: [PATCH] std.Io.Threaded: re-introduce retry logic behind config --- lib/std/Io/Threaded.zig | 94 ++++++++++++++++++++++++++++++----------- 1 file changed, 69 insertions(+), 25 deletions(-) diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index f285a51d6c..9c818e1794 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -53,6 +53,22 @@ cpu_count_error: ?std.Thread.CpuCountError, busy_count: usize = 0, main_thread: Thread, pid: Pid = .unknown, +/// When a cancel request is made, blocking syscalls can be unblocked by +/// issuing a signal. However, if the signal arrives after the check and before +/// the syscall instruction, it is missed. +/// +/// This option solves the race condition by retrying the signal delivery +/// until it is acknowledged, with an exponential backoff. +/// +/// Unfortunately, trying again until the cancellation request is acknowledged +/// has been observed to be relatively slow, and usually strong cancellation +/// guarantees are not needed, so this defaults to off. +/// +/// Musl libc does not have this problem because of a clever, undocumented +/// extension related to pthread_cancel, which this code integrates with. +/// When compiling with no libc, `Threaded` does not yet implement the +/// equivalent trick (tracked by https://codeberg.org/ziglang/zig/issues/30049). +robust_cancel: RobustCancel = if (is_musl) .enabled else .disabled, wsa: if (is_windows) Wsa else struct {} = .{}, @@ -60,6 +76,15 @@ have_signal_handler: bool, old_sig_io: if (have_sig_io) posix.Sigaction else void, old_sig_pipe: if (have_sig_pipe) posix.Sigaction else void, +pub const RobustCancel = if (is_musl) enum { + enabled, +} else if (std.Thread.use_pthreads or native_os == .linux) enum { + enabled, + disabled, +} else enum { + disabled, +}; + pub const Pid = if (native_os == .linux) enum(posix.pid_t) { unknown = 0, _, @@ -201,7 +226,7 @@ const Closure = struct { const Start = *const fn (*Closure, *Threaded) void; fn requestCancel(closure: *Closure, t: *Threaded) void { - const signal_id = switch (@atomicRmw(CancelStatus, &closure.cancel_status, .Xchg, .requested, .monotonic).unpack()) { + var signal_id = switch (@atomicRmw(CancelStatus, &closure.cancel_status, .Xchg, .requested, .monotonic).unpack()) { .none, .acknowledged, .requested => return, .signal_id => |signal_id| signal_id, }; @@ -214,32 +239,51 @@ const Closure = struct { // The task will enter a blocking syscall before checking for cancellation again. // We can send a signal to interrupt the syscall, but if it arrives before - // the syscall instruction, it will be missed. + // the syscall instruction, it will be missed. Therefore, this code tries + // again until the cancellation request is acknowledged. + + // 1 << 10 ns is about 1 microsecond, approximately syscall overhead. + // 1 << 20 ns is about 1 millisecond. + // 1 << 30 ns is about 1 second. // - // Unfortunately, trying again until the cancellation request is - // acknowledged has been observed to incur a large amount of overhead, - // and usually strong cancellation guarantees are not needed, so the - // race condition is not handled here. Users who want to avoid this - // have this menu of options instead: - // * Use no libc, in which case Zig std lib can avoid the race (tracking - // issue: https://codeberg.org/ziglang/zig/issues/30049) - // * Use musl libc instead of glibc - // * Use `std.Io.Evented`. But this is not implemented yet. Tracked by - // - https://codeberg.org/ziglang/zig/issues/30050 - // - https://codeberg.org/ziglang/zig/issues/30051 - if (std.Thread.use_pthreads) { - if (std.c.pthread_kill(signal_id, .IO) != 0) return; - } else if (native_os == .linux) { - const pid: posix.pid_t = p: { - const cached_pid = @atomicLoad(Pid, &t.pid, .monotonic); - if (cached_pid != .unknown) break :p @intFromEnum(cached_pid); - const pid = std.os.linux.getpid(); - @atomicStore(Pid, &t.pid, @enumFromInt(pid), .monotonic); - break :p pid; + // On a heavily loaded Linux 6.17.5, I observed a maximum of 20 + // attempts not acknowledged before the timeout (including exponential + // backoff) was sufficient, despite the heavy load. + const max_attempts = 22; + + for (0..max_attempts) |attempt_index| { + if (std.Thread.use_pthreads) { + if (std.c.pthread_kill(signal_id, .IO) != 0) return; + } else if (native_os == .linux) { + const pid: posix.pid_t = p: { + const cached_pid = @atomicLoad(Pid, &t.pid, .monotonic); + if (cached_pid != .unknown) break :p @intFromEnum(cached_pid); + const pid = std.os.linux.getpid(); + @atomicStore(Pid, &t.pid, @enumFromInt(pid), .monotonic); + break :p pid; + }; + if (std.os.linux.tgkill(pid, @bitCast(signal_id), .IO) != 0) return; + } else { + return; + } + + if (t.robust_cancel != .enabled) return; + + var timespec: posix.timespec = .{ + .sec = 0, + .nsec = @as(isize, 1) << @intCast(attempt_index), }; - if (std.os.linux.tgkill(pid, @bitCast(signal_id), .IO) != 0) return; - } else { - return; + if (native_os == .linux) { + _ = std.os.linux.clock_nanosleep(posix.CLOCK.MONOTONIC, .{ .ABSTIME = false }, ×pec, ×pec); + } else { + _ = posix.system.nanosleep(×pec, ×pec); + } + + switch (@atomicRmw(CancelStatus, &closure.cancel_status, .Xchg, .requested, .monotonic).unpack()) { + .requested => continue, // Retry needed in case other thread hasn't yet entered the syscall. + .none, .acknowledged => return, + .signal_id => |new_signal_id| signal_id = new_signal_id, + } } } };