std.Progress: fix many bugs

There were several bugs with the synchronization here; most notably an
ABA problem which was causing #21663. I fixed that and some other
issues, and took the opportunity to get rid of the `.seq_cst` orderings
from this file. I'm at least relatively sure my new orderings are correct.

Co-authored-by: achan1989 <achan1989@gmail.com>
Resolves: #21663
This commit is contained in:
mlugg 2025-04-29 05:55:49 +01:00 committed by Matthew Lugg
parent bf9b15ee67
commit ae1b444d6a

View file

@ -39,10 +39,20 @@ draw_buffer: []u8,
/// CPU cache. /// CPU cache.
node_parents: []Node.Parent, node_parents: []Node.Parent,
node_storage: []Node.Storage, node_storage: []Node.Storage,
node_freelist: []Node.OptionalIndex, node_freelist_next: []Node.OptionalIndex,
node_freelist_first: Node.OptionalIndex, node_freelist: Freelist,
/// This is the number of elements in node arrays which have been used so far. Nodes before this
/// index are either active, or on the freelist. The remaining nodes are implicitly free. This
/// value may at times temporarily exceed the node count.
node_end_index: u32, node_end_index: u32,
const Freelist = packed struct(u32) {
head: Node.OptionalIndex,
/// Whenever `node_freelist` is added to, this generation is incremented
/// to avoid ABA bugs when acquiring nodes. Wrapping arithmetic is used.
generation: u24,
};
pub const TerminalMode = union(enum) { pub const TerminalMode = union(enum) {
off, off,
ansi_escape_codes, ansi_escape_codes,
@ -112,7 +122,7 @@ pub const Node = struct {
// causes `completed_count` to be treated as a file descriptor, so // causes `completed_count` to be treated as a file descriptor, so
// the order here matters. // the order here matters.
@atomicStore(u32, &s.completed_count, integer, .monotonic); @atomicStore(u32, &s.completed_count, integer, .monotonic);
@atomicStore(u32, &s.estimated_total_count, std.math.maxInt(u32), .release); @atomicStore(u32, &s.estimated_total_count, std.math.maxInt(u32), .release); // synchronizes with acquire in `serialize`
} }
/// Not thread-safe. /// Not thread-safe.
@ -184,12 +194,24 @@ pub const Node = struct {
const node_index = node.index.unwrap() orelse return Node.none; const node_index = node.index.unwrap() orelse return Node.none;
const parent = node_index.toParent(); const parent = node_index.toParent();
const freelist_head = &global_progress.node_freelist_first; const freelist = &global_progress.node_freelist;
var opt_free_index = @atomicLoad(Node.OptionalIndex, freelist_head, .seq_cst); var old_freelist = @atomicLoad(Freelist, freelist, .acquire); // acquire to ensure we have the correct "next" entry
while (opt_free_index.unwrap()) |free_index| { while (old_freelist.head.unwrap()) |free_index| {
const freelist_ptr = freelistByIndex(free_index); const next_ptr = freelistNextByIndex(free_index);
const next = @atomicLoad(Node.OptionalIndex, freelist_ptr, .seq_cst); const new_freelist: Freelist = .{
opt_free_index = @cmpxchgWeak(Node.OptionalIndex, freelist_head, opt_free_index, next, .seq_cst, .seq_cst) orelse { .head = @atomicLoad(Node.OptionalIndex, next_ptr, .monotonic),
// We don't need to increment the generation when removing nodes from the free list,
// only when adding them. (This choice is arbitrary; the opposite would also work.)
.generation = old_freelist.generation,
};
old_freelist = @cmpxchgWeak(
Freelist,
freelist,
old_freelist,
new_freelist,
.acquire, // not theoretically necessary, but not allowed to be weaker than the failure order
.acquire, // ensure we have the correct `node_freelist_next` entry on the next iteration
) orelse {
// We won the allocation race. // We won the allocation race.
return init(free_index, parent, name, estimated_total_items); return init(free_index, parent, name, estimated_total_items);
}; };
@ -243,18 +265,28 @@ pub const Node = struct {
} }
const index = n.index.unwrap() orelse return; const index = n.index.unwrap() orelse return;
const parent_ptr = parentByIndex(index); const parent_ptr = parentByIndex(index);
if (parent_ptr.unwrap()) |parent_index| { if (@atomicLoad(Node.Parent, parent_ptr, .monotonic).unwrap()) |parent_index| {
_ = @atomicRmw(u32, &storageByIndex(parent_index).completed_count, .Add, 1, .monotonic); _ = @atomicRmw(u32, &storageByIndex(parent_index).completed_count, .Add, 1, .monotonic);
@atomicStore(Node.Parent, parent_ptr, .unused, .seq_cst); @atomicStore(Node.Parent, parent_ptr, .unused, .monotonic);
const freelist_head = &global_progress.node_freelist_first; const freelist = &global_progress.node_freelist;
var first = @atomicLoad(Node.OptionalIndex, freelist_head, .seq_cst); var old_freelist = @atomicLoad(Freelist, freelist, .monotonic);
while (true) { while (true) {
@atomicStore(Node.OptionalIndex, freelistByIndex(index), first, .seq_cst); @atomicStore(Node.OptionalIndex, freelistNextByIndex(index), old_freelist.head, .monotonic);
first = @cmpxchgWeak(Node.OptionalIndex, freelist_head, first, index.toOptional(), .seq_cst, .seq_cst) orelse break; old_freelist = @cmpxchgWeak(
Freelist,
freelist,
old_freelist,
.{ .head = index.toOptional(), .generation = old_freelist.generation +% 1 },
.release, // ensure a matching `start` sees the freelist link written above
.monotonic, // our write above is irrelevant if we need to retry
) orelse {
// We won the race.
return;
};
} }
} else { } else {
@atomicStore(bool, &global_progress.done, true, .seq_cst); @atomicStore(bool, &global_progress.done, true, .monotonic);
global_progress.redraw_event.set(); global_progress.redraw_event.set();
if (global_progress.update_thread) |thread| thread.join(); if (global_progress.update_thread) |thread| thread.join();
} }
@ -291,8 +323,8 @@ pub const Node = struct {
return &global_progress.node_parents[@intFromEnum(index)]; return &global_progress.node_parents[@intFromEnum(index)];
} }
fn freelistByIndex(index: Node.Index) *Node.OptionalIndex { fn freelistNextByIndex(index: Node.Index) *Node.OptionalIndex {
return &global_progress.node_freelist[@intFromEnum(index)]; return &global_progress.node_freelist_next[@intFromEnum(index)];
} }
fn init(free_index: Index, parent: Parent, name: []const u8, estimated_total_items: usize) Node { fn init(free_index: Index, parent: Parent, name: []const u8, estimated_total_items: usize) Node {
@ -307,8 +339,10 @@ pub const Node = struct {
@atomicStore(u8, &storage.name[name_len], 0, .monotonic); @atomicStore(u8, &storage.name[name_len], 0, .monotonic);
const parent_ptr = parentByIndex(free_index); const parent_ptr = parentByIndex(free_index);
assert(parent_ptr.* == .unused); if (std.debug.runtime_safety) {
@atomicStore(Node.Parent, parent_ptr, parent, .release); assert(@atomicLoad(Node.Parent, parent_ptr, .monotonic) == .unused);
}
@atomicStore(Node.Parent, parent_ptr, parent, .monotonic);
return .{ .index = free_index.toOptional() }; return .{ .index = free_index.toOptional() };
} }
@ -329,15 +363,15 @@ var global_progress: Progress = .{
.node_parents = &node_parents_buffer, .node_parents = &node_parents_buffer,
.node_storage = &node_storage_buffer, .node_storage = &node_storage_buffer,
.node_freelist = &node_freelist_buffer, .node_freelist_next = &node_freelist_next_buffer,
.node_freelist_first = .none, .node_freelist = .{ .head = .none, .generation = 0 },
.node_end_index = 0, .node_end_index = 0,
}; };
const node_storage_buffer_len = 83; const node_storage_buffer_len = 83;
var node_parents_buffer: [node_storage_buffer_len]Node.Parent = undefined; var node_parents_buffer: [node_storage_buffer_len]Node.Parent = undefined;
var node_storage_buffer: [node_storage_buffer_len]Node.Storage = undefined; var node_storage_buffer: [node_storage_buffer_len]Node.Storage = undefined;
var node_freelist_buffer: [node_storage_buffer_len]Node.OptionalIndex = undefined; var node_freelist_next_buffer: [node_storage_buffer_len]Node.OptionalIndex = undefined;
var default_draw_buffer: [4096]u8 = undefined; var default_draw_buffer: [4096]u8 = undefined;
@ -456,7 +490,7 @@ fn updateThreadRun() void {
{ {
const resize_flag = wait(global_progress.initial_delay_ns); const resize_flag = wait(global_progress.initial_delay_ns);
if (@atomicLoad(bool, &global_progress.done, .seq_cst)) return; if (@atomicLoad(bool, &global_progress.done, .monotonic)) return;
maybeUpdateSize(resize_flag); maybeUpdateSize(resize_flag);
const buffer, _ = computeRedraw(&serialized_buffer); const buffer, _ = computeRedraw(&serialized_buffer);
@ -470,7 +504,7 @@ fn updateThreadRun() void {
while (true) { while (true) {
const resize_flag = wait(global_progress.refresh_rate_ns); const resize_flag = wait(global_progress.refresh_rate_ns);
if (@atomicLoad(bool, &global_progress.done, .seq_cst)) { if (@atomicLoad(bool, &global_progress.done, .monotonic)) {
stderr_mutex.lock(); stderr_mutex.lock();
defer stderr_mutex.unlock(); defer stderr_mutex.unlock();
return clearWrittenWithEscapeCodes() catch {}; return clearWrittenWithEscapeCodes() catch {};
@ -500,7 +534,7 @@ fn windowsApiUpdateThreadRun() void {
{ {
const resize_flag = wait(global_progress.initial_delay_ns); const resize_flag = wait(global_progress.initial_delay_ns);
if (@atomicLoad(bool, &global_progress.done, .seq_cst)) return; if (@atomicLoad(bool, &global_progress.done, .monotonic)) return;
maybeUpdateSize(resize_flag); maybeUpdateSize(resize_flag);
const buffer, const nl_n = computeRedraw(&serialized_buffer); const buffer, const nl_n = computeRedraw(&serialized_buffer);
@ -516,7 +550,7 @@ fn windowsApiUpdateThreadRun() void {
while (true) { while (true) {
const resize_flag = wait(global_progress.refresh_rate_ns); const resize_flag = wait(global_progress.refresh_rate_ns);
if (@atomicLoad(bool, &global_progress.done, .seq_cst)) { if (@atomicLoad(bool, &global_progress.done, .monotonic)) {
stderr_mutex.lock(); stderr_mutex.lock();
defer stderr_mutex.unlock(); defer stderr_mutex.unlock();
return clearWrittenWindowsApi() catch {}; return clearWrittenWindowsApi() catch {};
@ -558,7 +592,7 @@ fn ipcThreadRun(fd: posix.fd_t) anyerror!void {
{ {
_ = wait(global_progress.initial_delay_ns); _ = wait(global_progress.initial_delay_ns);
if (@atomicLoad(bool, &global_progress.done, .seq_cst)) if (@atomicLoad(bool, &global_progress.done, .monotonic))
return; return;
const serialized = serialize(&serialized_buffer); const serialized = serialize(&serialized_buffer);
@ -570,7 +604,7 @@ fn ipcThreadRun(fd: posix.fd_t) anyerror!void {
while (true) { while (true) {
_ = wait(global_progress.refresh_rate_ns); _ = wait(global_progress.refresh_rate_ns);
if (@atomicLoad(bool, &global_progress.done, .seq_cst)) if (@atomicLoad(bool, &global_progress.done, .monotonic))
return; return;
const serialized = serialize(&serialized_buffer); const serialized = serialize(&serialized_buffer);
@ -765,37 +799,39 @@ fn serialize(serialized_buffer: *Serialized.Buffer) Serialized {
var any_ipc = false; var any_ipc = false;
// Iterate all of the nodes and construct a serializable copy of the state that can be examined // Iterate all of the nodes and construct a serializable copy of the state that can be examined
// without atomics. // without atomics. The `@min` call is here because `node_end_index` might briefly exceed the
const end_index = @atomicLoad(u32, &global_progress.node_end_index, .monotonic); // node count sometimes.
const end_index = @min(@atomicLoad(u32, &global_progress.node_end_index, .monotonic), global_progress.node_storage.len);
for ( for (
global_progress.node_parents[0..end_index], global_progress.node_parents[0..end_index],
global_progress.node_storage[0..end_index], global_progress.node_storage[0..end_index],
serialized_buffer.map[0..end_index], serialized_buffer.map[0..end_index],
) |*parent_ptr, *storage_ptr, *map| { ) |*parent_ptr, *storage_ptr, *map| {
var begin_parent = @atomicLoad(Node.Parent, parent_ptr, .acquire); const parent = @atomicLoad(Node.Parent, parent_ptr, .monotonic);
while (begin_parent != .unused) { if (parent == .unused) {
const dest_storage = &serialized_buffer.storage[serialized_len]; // We might read "mixed" node data in this loop, due to weird atomic things
copyAtomicLoad(&dest_storage.name, &storage_ptr.name); // or just a node actually being freed while this loop runs. That could cause
dest_storage.estimated_total_count = @atomicLoad(u32, &storage_ptr.estimated_total_count, .acquire); // there to be a parent reference to a nonexistent node. Without this assignment,
dest_storage.completed_count = @atomicLoad(u32, &storage_ptr.completed_count, .monotonic); // this would lead to the map entry containing stale data. By assigning none, the
const end_parent = @atomicLoad(Node.Parent, parent_ptr, .acquire); // child node with the bad parent pointer will be harmlessly omitted from the tree.
if (begin_parent == end_parent) { //
any_ipc = any_ipc or (dest_storage.getIpcFd() != null); // Note that there's no concern of potentially creating "looping" data if we read
serialized_buffer.parents[serialized_len] = begin_parent; // "mixed" node data like this, because if a node is (directly or indirectly) its own
map.* = @enumFromInt(serialized_len); // parent, it will just not be printed at all. The general idea here is that performance
serialized_len += 1; // is more important than 100% correct output every frame, given that this API is likely
break; // to be used in hot paths!
}
begin_parent = end_parent;
} else {
// A node may be freed during the execution of this loop, causing
// there to be a parent reference to a nonexistent node. Without
// this assignment, this would lead to the map entry containing
// stale data. By assigning none, the child node with the bad
// parent pointer will be harmlessly omitted from the tree.
map.* = .none; map.* = .none;
continue;
} }
const dest_storage = &serialized_buffer.storage[serialized_len];
copyAtomicLoad(&dest_storage.name, &storage_ptr.name);
dest_storage.estimated_total_count = @atomicLoad(u32, &storage_ptr.estimated_total_count, .acquire); // sychronizes with release in `setIpcFd`
dest_storage.completed_count = @atomicLoad(u32, &storage_ptr.completed_count, .monotonic);
any_ipc = any_ipc or (dest_storage.getIpcFd() != null);
serialized_buffer.parents[serialized_len] = parent;
map.* = @enumFromInt(serialized_len);
serialized_len += 1;
} }
// Remap parents to point inside serialized arrays. // Remap parents to point inside serialized arrays.