std.Io.Threaded: re-introduce retry logic behind config

This commit is contained in:
Andrew Kelley 2025-11-29 08:58:50 -08:00
parent de87bad4c3
commit c4f5dda135

View file

@ -53,6 +53,22 @@ cpu_count_error: ?std.Thread.CpuCountError,
busy_count: usize = 0,
main_thread: Thread,
pid: Pid = .unknown,
/// When a cancel request is made, blocking syscalls can be unblocked by
/// issuing a signal. However, if the signal arrives after the check and before
/// the syscall instruction, it is missed.
///
/// This option solves the race condition by retrying the signal delivery
/// until it is acknowledged, with an exponential backoff.
///
/// Unfortunately, trying again until the cancellation request is acknowledged
/// has been observed to be relatively slow, and usually strong cancellation
/// guarantees are not needed, so this defaults to off.
///
/// Musl libc does not have this problem because of a clever, undocumented
/// extension related to pthread_cancel, which this code integrates with.
/// When compiling with no libc, `Threaded` does not yet implement the
/// equivalent trick (tracked by https://codeberg.org/ziglang/zig/issues/30049).
robust_cancel: RobustCancel = if (is_musl) .enabled else .disabled,
wsa: if (is_windows) Wsa else struct {} = .{},
@ -60,6 +76,15 @@ have_signal_handler: bool,
old_sig_io: if (have_sig_io) posix.Sigaction else void,
old_sig_pipe: if (have_sig_pipe) posix.Sigaction else void,
pub const RobustCancel = if (is_musl) enum {
enabled,
} else if (std.Thread.use_pthreads or native_os == .linux) enum {
enabled,
disabled,
} else enum {
disabled,
};
pub const Pid = if (native_os == .linux) enum(posix.pid_t) {
unknown = 0,
_,
@ -201,7 +226,7 @@ const Closure = struct {
const Start = *const fn (*Closure, *Threaded) void;
fn requestCancel(closure: *Closure, t: *Threaded) void {
const signal_id = switch (@atomicRmw(CancelStatus, &closure.cancel_status, .Xchg, .requested, .monotonic).unpack()) {
var signal_id = switch (@atomicRmw(CancelStatus, &closure.cancel_status, .Xchg, .requested, .monotonic).unpack()) {
.none, .acknowledged, .requested => return,
.signal_id => |signal_id| signal_id,
};
@ -214,32 +239,51 @@ const Closure = struct {
// The task will enter a blocking syscall before checking for cancellation again.
// We can send a signal to interrupt the syscall, but if it arrives before
// the syscall instruction, it will be missed.
// the syscall instruction, it will be missed. Therefore, this code tries
// again until the cancellation request is acknowledged.
// 1 << 10 ns is about 1 microsecond, approximately syscall overhead.
// 1 << 20 ns is about 1 millisecond.
// 1 << 30 ns is about 1 second.
//
// Unfortunately, trying again until the cancellation request is
// acknowledged has been observed to incur a large amount of overhead,
// and usually strong cancellation guarantees are not needed, so the
// race condition is not handled here. Users who want to avoid this
// have this menu of options instead:
// * Use no libc, in which case Zig std lib can avoid the race (tracking
// issue: https://codeberg.org/ziglang/zig/issues/30049)
// * Use musl libc instead of glibc
// * Use `std.Io.Evented`. But this is not implemented yet. Tracked by
// - https://codeberg.org/ziglang/zig/issues/30050
// - https://codeberg.org/ziglang/zig/issues/30051
if (std.Thread.use_pthreads) {
if (std.c.pthread_kill(signal_id, .IO) != 0) return;
} else if (native_os == .linux) {
const pid: posix.pid_t = p: {
const cached_pid = @atomicLoad(Pid, &t.pid, .monotonic);
if (cached_pid != .unknown) break :p @intFromEnum(cached_pid);
const pid = std.os.linux.getpid();
@atomicStore(Pid, &t.pid, @enumFromInt(pid), .monotonic);
break :p pid;
// On a heavily loaded Linux 6.17.5, I observed a maximum of 20
// attempts not acknowledged before the timeout (including exponential
// backoff) was sufficient, despite the heavy load.
const max_attempts = 22;
for (0..max_attempts) |attempt_index| {
if (std.Thread.use_pthreads) {
if (std.c.pthread_kill(signal_id, .IO) != 0) return;
} else if (native_os == .linux) {
const pid: posix.pid_t = p: {
const cached_pid = @atomicLoad(Pid, &t.pid, .monotonic);
if (cached_pid != .unknown) break :p @intFromEnum(cached_pid);
const pid = std.os.linux.getpid();
@atomicStore(Pid, &t.pid, @enumFromInt(pid), .monotonic);
break :p pid;
};
if (std.os.linux.tgkill(pid, @bitCast(signal_id), .IO) != 0) return;
} else {
return;
}
if (t.robust_cancel != .enabled) return;
var timespec: posix.timespec = .{
.sec = 0,
.nsec = @as(isize, 1) << @intCast(attempt_index),
};
if (std.os.linux.tgkill(pid, @bitCast(signal_id), .IO) != 0) return;
} else {
return;
if (native_os == .linux) {
_ = std.os.linux.clock_nanosleep(posix.CLOCK.MONOTONIC, .{ .ABSTIME = false }, &timespec, &timespec);
} else {
_ = posix.system.nanosleep(&timespec, &timespec);
}
switch (@atomicRmw(CancelStatus, &closure.cancel_status, .Xchg, .requested, .monotonic).unpack()) {
.requested => continue, // Retry needed in case other thread hasn't yet entered the syscall.
.none, .acknowledged => return,
.signal_id => |new_signal_id| signal_id = new_signal_id,
}
}
}
};