mirror of
https://codeberg.org/ziglang/zig.git
synced 2025-12-07 14:24:43 +00:00
std.Progress: fix race condition with IPC nodes
It stored some metadata into the canonical node storage data but that is a race condition because another thread recycles those nodes. Also, keep the parent name for empty child root node names.
This commit is contained in:
parent
516366f78f
commit
ca03c9c512
1 changed files with 49 additions and 40 deletions
|
|
@ -86,17 +86,7 @@ pub const Node = struct {
|
||||||
name: [max_name_len]u8,
|
name: [max_name_len]u8,
|
||||||
|
|
||||||
fn getIpcFd(s: Storage) ?posix.fd_t {
|
fn getIpcFd(s: Storage) ?posix.fd_t {
|
||||||
if (s.estimated_total_count != std.math.maxInt(u32))
|
return if (s.estimated_total_count != std.math.maxInt(u32)) null else @bitCast(s.completed_count);
|
||||||
return null;
|
|
||||||
|
|
||||||
const low: u16 = @truncate(s.completed_count);
|
|
||||||
return low;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn getMainStorageIndex(s: Storage) Node.Index {
|
|
||||||
assert(s.estimated_total_count == std.math.maxInt(u32));
|
|
||||||
const i: u16 = @truncate(s.completed_count >> 16);
|
|
||||||
return @enumFromInt(i);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn setIpcFd(s: *Storage, fd: posix.fd_t) void {
|
fn setIpcFd(s: *Storage, fd: posix.fd_t) void {
|
||||||
|
|
@ -538,14 +528,9 @@ fn serialize() Serialized {
|
||||||
@memcpy(&dest_storage.name, &storage_ptr.name);
|
@memcpy(&dest_storage.name, &storage_ptr.name);
|
||||||
dest_storage.completed_count = @atomicLoad(u32, &storage_ptr.completed_count, .monotonic);
|
dest_storage.completed_count = @atomicLoad(u32, &storage_ptr.completed_count, .monotonic);
|
||||||
dest_storage.estimated_total_count = @atomicLoad(u32, &storage_ptr.estimated_total_count, .monotonic);
|
dest_storage.estimated_total_count = @atomicLoad(u32, &storage_ptr.estimated_total_count, .monotonic);
|
||||||
|
|
||||||
if (dest_storage.getIpcFd() != null) {
|
|
||||||
any_ipc = true;
|
|
||||||
dest_storage.completed_count |= @as(u32, @intCast(i)) << 16;
|
|
||||||
}
|
|
||||||
|
|
||||||
const end_parent = @atomicLoad(Node.Parent, parent_ptr, .seq_cst);
|
const end_parent = @atomicLoad(Node.Parent, parent_ptr, .seq_cst);
|
||||||
if (begin_parent == end_parent) {
|
if (begin_parent == end_parent) {
|
||||||
|
any_ipc = any_ipc or (dest_storage.getIpcFd() != null);
|
||||||
serialized_node_parents_buffer[serialized_len] = begin_parent;
|
serialized_node_parents_buffer[serialized_len] = begin_parent;
|
||||||
serialized_node_map_buffer[i] = @enumFromInt(serialized_len);
|
serialized_node_map_buffer[i] = @enumFromInt(serialized_len);
|
||||||
serialized_len += 1;
|
serialized_len += 1;
|
||||||
|
|
@ -577,23 +562,25 @@ fn serialize() Serialized {
|
||||||
|
|
||||||
var parents_copy: [default_node_storage_buffer_len]Node.Parent = undefined;
|
var parents_copy: [default_node_storage_buffer_len]Node.Parent = undefined;
|
||||||
var storage_copy: [default_node_storage_buffer_len]Node.Storage = undefined;
|
var storage_copy: [default_node_storage_buffer_len]Node.Storage = undefined;
|
||||||
|
var ipc_metadata_copy: [default_node_storage_buffer_len]SavedMetadata = undefined;
|
||||||
|
|
||||||
const SavedMetadata = extern struct {
|
var ipc_metadata: [default_node_storage_buffer_len]SavedMetadata = undefined;
|
||||||
|
var ipc_metadata_len: u16 = 0;
|
||||||
|
|
||||||
|
const SavedMetadata = struct {
|
||||||
|
ipc_fd: u16,
|
||||||
|
main_index: u16,
|
||||||
start_index: u16,
|
start_index: u16,
|
||||||
nodes_len: u16,
|
nodes_len: u16,
|
||||||
main_index: u16,
|
|
||||||
flags: Flags,
|
|
||||||
|
|
||||||
const Flags = enum(u16) {
|
|
||||||
saved = std.math.maxInt(u16),
|
|
||||||
_,
|
|
||||||
};
|
|
||||||
};
|
};
|
||||||
|
|
||||||
fn serializeIpc(start_serialized_len: usize) usize {
|
fn serializeIpc(start_serialized_len: usize) usize {
|
||||||
var serialized_len = start_serialized_len;
|
var serialized_len = start_serialized_len;
|
||||||
var pipe_buf: [2 * 4096]u8 align(4) = undefined;
|
var pipe_buf: [2 * 4096]u8 align(4) = undefined;
|
||||||
|
|
||||||
|
const old_ipc_metadata = ipc_metadata_copy[0..ipc_metadata_len];
|
||||||
|
ipc_metadata_len = 0;
|
||||||
|
|
||||||
main_loop: for (
|
main_loop: for (
|
||||||
serialized_node_parents_buffer[0..serialized_len],
|
serialized_node_parents_buffer[0..serialized_len],
|
||||||
serialized_node_storage_buffer[0..serialized_len],
|
serialized_node_storage_buffer[0..serialized_len],
|
||||||
|
|
@ -618,7 +605,7 @@ fn serializeIpc(start_serialized_len: usize) usize {
|
||||||
// Ignore all but the last message on the pipe.
|
// Ignore all but the last message on the pipe.
|
||||||
var input: []align(2) u8 = pipe_buf[0..bytes_read];
|
var input: []align(2) u8 = pipe_buf[0..bytes_read];
|
||||||
if (input.len == 0) {
|
if (input.len == 0) {
|
||||||
serialized_len = useSavedIpcData(serialized_len, main_storage, main_index);
|
serialized_len = useSavedIpcData(serialized_len, main_storage, main_index, old_ipc_metadata);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -626,7 +613,7 @@ fn serializeIpc(start_serialized_len: usize) usize {
|
||||||
if (input.len < 4) {
|
if (input.len < 4) {
|
||||||
std.log.warn("short read: {d} out of 4 header bytes", .{input.len});
|
std.log.warn("short read: {d} out of 4 header bytes", .{input.len});
|
||||||
// TODO keep track of the short read to trash odd bytes with the next read
|
// TODO keep track of the short read to trash odd bytes with the next read
|
||||||
serialized_len = useSavedIpcData(serialized_len, main_storage, main_index);
|
serialized_len = useSavedIpcData(serialized_len, main_storage, main_index, old_ipc_metadata);
|
||||||
continue :main_loop;
|
continue :main_loop;
|
||||||
}
|
}
|
||||||
const subtree_len = std.mem.readInt(u32, input[0..4], .little);
|
const subtree_len = std.mem.readInt(u32, input[0..4], .little);
|
||||||
|
|
@ -634,7 +621,7 @@ fn serializeIpc(start_serialized_len: usize) usize {
|
||||||
if (input.len < expected_bytes) {
|
if (input.len < expected_bytes) {
|
||||||
std.log.warn("short read: {d} out of {d} ({d} nodes)", .{ input.len, expected_bytes, subtree_len });
|
std.log.warn("short read: {d} out of {d} ({d} nodes)", .{ input.len, expected_bytes, subtree_len });
|
||||||
// TODO keep track of the short read to trash odd bytes with the next read
|
// TODO keep track of the short read to trash odd bytes with the next read
|
||||||
serialized_len = useSavedIpcData(serialized_len, main_storage, main_index);
|
serialized_len = useSavedIpcData(serialized_len, main_storage, main_index, old_ipc_metadata);
|
||||||
continue :main_loop;
|
continue :main_loop;
|
||||||
}
|
}
|
||||||
if (input.len > expected_bytes) {
|
if (input.len > expected_bytes) {
|
||||||
|
|
@ -650,16 +637,16 @@ fn serializeIpc(start_serialized_len: usize) usize {
|
||||||
};
|
};
|
||||||
|
|
||||||
// Remember in case the pipe is empty on next update.
|
// Remember in case the pipe is empty on next update.
|
||||||
const real_storage: *Node.Storage = Node.storageByIndex(main_storage.getMainStorageIndex());
|
ipc_metadata[ipc_metadata_len] = .{
|
||||||
@as(*SavedMetadata, @ptrCast(&real_storage.name)).* = .{
|
.ipc_fd = @intCast(fd),
|
||||||
.start_index = @intCast(serialized_len),
|
.start_index = @intCast(serialized_len),
|
||||||
.nodes_len = @intCast(parents.len),
|
.nodes_len = @intCast(parents.len),
|
||||||
.main_index = @intCast(main_index),
|
.main_index = @intCast(main_index),
|
||||||
.flags = .saved,
|
|
||||||
};
|
};
|
||||||
|
ipc_metadata_len += 1;
|
||||||
|
|
||||||
// Mount the root here.
|
// Mount the root here.
|
||||||
main_storage.* = storage[0];
|
copyRoot(main_storage, &storage[0]);
|
||||||
|
|
||||||
// Copy the rest of the tree to the end.
|
// Copy the rest of the tree to the end.
|
||||||
@memcpy(serialized_node_storage_buffer[serialized_len..][0 .. storage.len - 1], storage[1..]);
|
@memcpy(serialized_node_storage_buffer[serialized_len..][0 .. storage.len - 1], storage[1..]);
|
||||||
|
|
@ -685,34 +672,56 @@ fn serializeIpc(start_serialized_len: usize) usize {
|
||||||
// Save a copy in case any pipes are empty on the next update.
|
// Save a copy in case any pipes are empty on the next update.
|
||||||
@memcpy(parents_copy[0..serialized_len], serialized_node_parents_buffer[0..serialized_len]);
|
@memcpy(parents_copy[0..serialized_len], serialized_node_parents_buffer[0..serialized_len]);
|
||||||
@memcpy(storage_copy[0..serialized_len], serialized_node_storage_buffer[0..serialized_len]);
|
@memcpy(storage_copy[0..serialized_len], serialized_node_storage_buffer[0..serialized_len]);
|
||||||
|
@memcpy(ipc_metadata_copy[0..ipc_metadata_len], ipc_metadata[0..ipc_metadata_len]);
|
||||||
|
|
||||||
return serialized_len;
|
return serialized_len;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn useSavedIpcData(start_serialized_len: usize, main_storage: *Node.Storage, main_index: usize) usize {
|
fn copyRoot(dest: *Node.Storage, src: *align(2) Node.Storage) void {
|
||||||
const saved_metadata: *SavedMetadata = @ptrCast(&main_storage.name);
|
dest.* = .{
|
||||||
if (saved_metadata.flags != .saved) {
|
.completed_count = src.completed_count,
|
||||||
|
.estimated_total_count = src.estimated_total_count,
|
||||||
|
.name = if (src.name[0] == 0) dest.name else src.name,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn findOld(ipc_fd: posix.fd_t, old_metadata: []const SavedMetadata) ?*const SavedMetadata {
|
||||||
|
for (old_metadata) |*m| {
|
||||||
|
if (m.ipc_fd == ipc_fd)
|
||||||
|
return m;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn useSavedIpcData(
|
||||||
|
start_serialized_len: usize,
|
||||||
|
main_storage: *Node.Storage,
|
||||||
|
main_index: usize,
|
||||||
|
old_metadata: []const SavedMetadata,
|
||||||
|
) usize {
|
||||||
|
const ipc_fd = main_storage.getIpcFd().?;
|
||||||
|
const saved_metadata = findOld(ipc_fd, old_metadata) orelse {
|
||||||
main_storage.completed_count = 0;
|
main_storage.completed_count = 0;
|
||||||
main_storage.estimated_total_count = 0;
|
main_storage.estimated_total_count = 0;
|
||||||
return start_serialized_len;
|
return start_serialized_len;
|
||||||
}
|
};
|
||||||
|
|
||||||
const start_index = saved_metadata.start_index;
|
const start_index = saved_metadata.start_index;
|
||||||
const nodes_len = saved_metadata.nodes_len;
|
const nodes_len = saved_metadata.nodes_len;
|
||||||
const old_main_index = saved_metadata.main_index;
|
const old_main_index = saved_metadata.main_index;
|
||||||
|
|
||||||
const real_storage: *Node.Storage = Node.storageByIndex(main_storage.getMainStorageIndex());
|
ipc_metadata[ipc_metadata_len] = .{
|
||||||
@as(*SavedMetadata, @ptrCast(&real_storage.name)).* = .{
|
.ipc_fd = @intCast(ipc_fd),
|
||||||
.start_index = @intCast(start_serialized_len),
|
.start_index = @intCast(start_serialized_len),
|
||||||
.nodes_len = nodes_len,
|
.nodes_len = nodes_len,
|
||||||
.main_index = @intCast(main_index),
|
.main_index = @intCast(main_index),
|
||||||
.flags = .saved,
|
|
||||||
};
|
};
|
||||||
|
ipc_metadata_len += 1;
|
||||||
|
|
||||||
const parents = parents_copy[start_index..][0 .. nodes_len - 1];
|
const parents = parents_copy[start_index..][0 .. nodes_len - 1];
|
||||||
const storage = storage_copy[start_index..][0 .. nodes_len - 1];
|
const storage = storage_copy[start_index..][0 .. nodes_len - 1];
|
||||||
|
|
||||||
main_storage.* = storage_copy[old_main_index];
|
copyRoot(main_storage, &storage_copy[old_main_index]);
|
||||||
|
|
||||||
@memcpy(serialized_node_storage_buffer[start_serialized_len..][0..storage.len], storage);
|
@memcpy(serialized_node_storage_buffer[start_serialized_len..][0..storage.len], storage);
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue