compiler: use std.Io; delete std.Thread.Pool

This commit is contained in:
Andrew Kelley 2025-10-29 20:18:35 -07:00
parent a072d821be
commit d9617f846f
25 changed files with 416 additions and 749 deletions

View file

@ -412,8 +412,6 @@ set(ZIG_STAGE2_SOURCES
lib/std/Thread.zig lib/std/Thread.zig
lib/std/Thread/Futex.zig lib/std/Thread/Futex.zig
lib/std/Thread/Mutex.zig lib/std/Thread/Mutex.zig
lib/std/Thread/Pool.zig
lib/std/Thread/WaitGroup.zig
lib/std/array_hash_map.zig lib/std/array_hash_map.zig
lib/std/array_list.zig lib/std/array_list.zig
lib/std/ascii.zig lib/std/ascii.zig

View file

@ -107,7 +107,6 @@ pub fn main() !void {
var targets = std.array_list.Managed([]const u8).init(arena); var targets = std.array_list.Managed([]const u8).init(arena);
var debug_log_scopes = std.array_list.Managed([]const u8).init(arena); var debug_log_scopes = std.array_list.Managed([]const u8).init(arena);
var thread_pool_options: std.Thread.Pool.Options = .{ .allocator = arena };
var install_prefix: ?[]const u8 = null; var install_prefix: ?[]const u8 = null;
var dir_list = std.Build.DirList{}; var dir_list = std.Build.DirList{};
@ -415,17 +414,10 @@ pub fn main() !void {
builder.reference_trace = null; builder.reference_trace = null;
} else if (mem.startsWith(u8, arg, "-j")) { } else if (mem.startsWith(u8, arg, "-j")) {
const num = arg["-j".len..]; const num = arg["-j".len..];
const n_jobs = std.fmt.parseUnsigned(u32, num, 10) catch |err| { const n_jobs = std.fmt.parseUnsigned(u32, num, 10) catch |err|
std.debug.print("unable to parse jobs count '{s}': {s}", .{ fatal("unable to parse jobs count '{s}': {t}", .{ num, err });
num, @errorName(err), if (n_jobs < 1) fatal("number of jobs must be at least 1", .{});
}); threaded.setThreadCapacity(n_jobs);
process.exit(1);
};
if (n_jobs < 1) {
std.debug.print("number of jobs must be at least 1\n", .{});
process.exit(1);
}
thread_pool_options.n_jobs = n_jobs;
} else if (mem.eql(u8, arg, "--")) { } else if (mem.eql(u8, arg, "--")) {
builder.args = argsRest(args, arg_idx); builder.args = argsRest(args, arg_idx);
break; break;
@ -524,7 +516,6 @@ pub fn main() !void {
.summary = summary orelse if (watch or webui_listen != null) .line else .failures, .summary = summary orelse if (watch or webui_listen != null) .line else .failures,
.ttyconf = ttyconf, .ttyconf = ttyconf,
.stderr = stderr, .stderr = stderr,
.thread_pool = undefined,
}; };
defer { defer {
run.memory_blocked_steps.deinit(gpa); run.memory_blocked_steps.deinit(gpa);
@ -553,16 +544,12 @@ pub fn main() !void {
break :w try .init(); break :w try .init();
}; };
try run.thread_pool.init(thread_pool_options);
defer run.thread_pool.deinit();
const now = Io.Clock.Timestamp.now(io, .awake) catch |err| fatal("failed to collect timestamp: {t}", .{err}); const now = Io.Clock.Timestamp.now(io, .awake) catch |err| fatal("failed to collect timestamp: {t}", .{err});
run.web_server = if (webui_listen) |listen_address| ws: { run.web_server = if (webui_listen) |listen_address| ws: {
if (builtin.single_threaded) unreachable; // `fatal` above if (builtin.single_threaded) unreachable; // `fatal` above
break :ws .init(.{ break :ws .init(.{
.gpa = gpa, .gpa = gpa,
.thread_pool = &run.thread_pool,
.graph = &graph, .graph = &graph,
.all_steps = run.step_stack.keys(), .all_steps = run.step_stack.keys(),
.ttyconf = run.ttyconf, .ttyconf = run.ttyconf,
@ -681,7 +668,6 @@ const Run = struct {
memory_blocked_steps: std.ArrayListUnmanaged(*Step), memory_blocked_steps: std.ArrayListUnmanaged(*Step),
/// Allocated into `gpa`. /// Allocated into `gpa`.
step_stack: std.AutoArrayHashMapUnmanaged(*Step, void), step_stack: std.AutoArrayHashMapUnmanaged(*Step, void),
thread_pool: std.Thread.Pool,
claimed_rss: usize, claimed_rss: usize,
error_style: ErrorStyle, error_style: ErrorStyle,
@ -759,14 +745,13 @@ fn runStepNames(
const gpa = run.gpa; const gpa = run.gpa;
const io = b.graph.io; const io = b.graph.io;
const step_stack = &run.step_stack; const step_stack = &run.step_stack;
const thread_pool = &run.thread_pool;
{ {
const step_prog = parent_prog_node.start("steps", step_stack.count()); const step_prog = parent_prog_node.start("steps", step_stack.count());
defer step_prog.end(); defer step_prog.end();
var wait_group: std.Thread.WaitGroup = .{}; var wait_group: Io.Group = .init;
defer wait_group.wait(); defer wait_group.wait(io);
// Here we spawn the initial set of tasks with a nice heuristic - // Here we spawn the initial set of tasks with a nice heuristic -
// dependency order. Each worker when it finishes a step will then // dependency order. Each worker when it finishes a step will then
@ -776,9 +761,7 @@ fn runStepNames(
const step = steps_slice[steps_slice.len - i - 1]; const step = steps_slice[steps_slice.len - i - 1];
if (step.state == .skipped_oom) continue; if (step.state == .skipped_oom) continue;
thread_pool.spawnWg(&wait_group, workerMakeOneStep, .{ wait_group.async(io, workerMakeOneStep, .{ &wait_group, b, step, step_prog, run });
&wait_group, b, step, step_prog, run,
});
} }
} }
@ -862,12 +845,12 @@ fn runStepNames(
var f = std.Build.Fuzz.init( var f = std.Build.Fuzz.init(
gpa, gpa,
io, io,
thread_pool,
step_stack.keys(), step_stack.keys(),
parent_prog_node, parent_prog_node,
ttyconf, ttyconf,
mode, mode,
) catch |err| fatal("failed to start fuzzer: {s}", .{@errorName(err)}); ) catch |err|
fatal("failed to start fuzzer: {t}", .{err});
defer f.deinit(); defer f.deinit();
f.start(); f.start();
@ -1324,13 +1307,13 @@ fn constructGraphAndCheckForDependencyLoop(
} }
fn workerMakeOneStep( fn workerMakeOneStep(
wg: *std.Thread.WaitGroup, wg: *Io.Group,
b: *std.Build, b: *std.Build,
s: *Step, s: *Step,
prog_node: std.Progress.Node, prog_node: std.Progress.Node,
run: *Run, run: *Run,
) void { ) void {
const thread_pool = &run.thread_pool; const io = b.graph.io;
// First, check the conditions for running this step. If they are not met, // First, check the conditions for running this step. If they are not met,
// then we return without doing the step, relying on another worker to // then we return without doing the step, relying on another worker to
@ -1387,7 +1370,6 @@ fn workerMakeOneStep(
const make_result = s.make(.{ const make_result = s.make(.{
.progress_node = sub_prog_node, .progress_node = sub_prog_node,
.thread_pool = thread_pool,
.watch = run.watch, .watch = run.watch,
.web_server = if (run.web_server) |*ws| ws else null, .web_server = if (run.web_server) |*ws| ws else null,
.unit_test_timeout_ns = run.unit_test_timeout_ns, .unit_test_timeout_ns = run.unit_test_timeout_ns,
@ -1423,9 +1405,7 @@ fn workerMakeOneStep(
// Successful completion of a step, so we queue up its dependants as well. // Successful completion of a step, so we queue up its dependants as well.
for (s.dependants.items) |dep| { for (s.dependants.items) |dep| {
thread_pool.spawnWg(wg, workerMakeOneStep, .{ wg.async(io, workerMakeOneStep, .{ wg, b, dep, prog_node, run });
wg, b, dep, prog_node, run,
});
} }
} }
@ -1448,9 +1428,7 @@ fn workerMakeOneStep(
if (dep.max_rss <= remaining) { if (dep.max_rss <= remaining) {
remaining -= dep.max_rss; remaining -= dep.max_rss;
thread_pool.spawnWg(wg, workerMakeOneStep, .{ wg.async(io, workerMakeOneStep, .{ wg, b, dep, prog_node, run });
wg, b, dep, prog_node, run,
});
} else { } else {
run.memory_blocked_steps.items[i] = dep; run.memory_blocked_steps.items[i] = dep;
i += 1; i += 1;

View file

@ -21,10 +21,9 @@ mode: Mode,
/// Allocated into `gpa`. /// Allocated into `gpa`.
run_steps: []const *Step.Run, run_steps: []const *Step.Run,
wait_group: std.Thread.WaitGroup, wait_group: Io.Group,
root_prog_node: std.Progress.Node, root_prog_node: std.Progress.Node,
prog_node: std.Progress.Node, prog_node: std.Progress.Node,
thread_pool: *std.Thread.Pool,
ttyconf: tty.Config, ttyconf: tty.Config,
/// Protects `coverage_files`. /// Protects `coverage_files`.
@ -78,7 +77,6 @@ const CoverageMap = struct {
pub fn init( pub fn init(
gpa: Allocator, gpa: Allocator,
io: Io, io: Io,
thread_pool: *std.Thread.Pool,
all_steps: []const *Build.Step, all_steps: []const *Build.Step,
root_prog_node: std.Progress.Node, root_prog_node: std.Progress.Node,
ttyconf: tty.Config, ttyconf: tty.Config,
@ -89,15 +87,15 @@ pub fn init(
defer steps.deinit(gpa); defer steps.deinit(gpa);
const rebuild_node = root_prog_node.start("Rebuilding Unit Tests", 0); const rebuild_node = root_prog_node.start("Rebuilding Unit Tests", 0);
defer rebuild_node.end(); defer rebuild_node.end();
var rebuild_wg: std.Thread.WaitGroup = .{}; var rebuild_wg: Io.Group = .init;
defer rebuild_wg.wait(); defer rebuild_wg.wait(io);
for (all_steps) |step| { for (all_steps) |step| {
const run = step.cast(Step.Run) orelse continue; const run = step.cast(Step.Run) orelse continue;
if (run.producer == null) continue; if (run.producer == null) continue;
if (run.fuzz_tests.items.len == 0) continue; if (run.fuzz_tests.items.len == 0) continue;
try steps.append(gpa, run); try steps.append(gpa, run);
thread_pool.spawnWg(&rebuild_wg, rebuildTestsWorkerRun, .{ run, gpa, ttyconf, rebuild_node }); rebuild_wg.async(io, rebuildTestsWorkerRun, .{ run, gpa, ttyconf, rebuild_node });
} }
if (steps.items.len == 0) fatal("no fuzz tests found", .{}); if (steps.items.len == 0) fatal("no fuzz tests found", .{});
@ -117,8 +115,7 @@ pub fn init(
.io = io, .io = io,
.mode = mode, .mode = mode,
.run_steps = run_steps, .run_steps = run_steps,
.wait_group = .{}, .wait_group = .init,
.thread_pool = thread_pool,
.ttyconf = ttyconf, .ttyconf = ttyconf,
.root_prog_node = root_prog_node, .root_prog_node = root_prog_node,
.prog_node = .none, .prog_node = .none,
@ -131,29 +128,26 @@ pub fn init(
} }
pub fn start(fuzz: *Fuzz) void { pub fn start(fuzz: *Fuzz) void {
const io = fuzz.io;
fuzz.prog_node = fuzz.root_prog_node.start("Fuzzing", fuzz.run_steps.len); fuzz.prog_node = fuzz.root_prog_node.start("Fuzzing", fuzz.run_steps.len);
if (fuzz.mode == .forever) { if (fuzz.mode == .forever) {
// For polling messages and sending updates to subscribers. // For polling messages and sending updates to subscribers.
fuzz.wait_group.start(); fuzz.wait_group.concurrent(io, coverageRun, .{fuzz}) catch |err|
_ = std.Thread.spawn(.{}, coverageRun, .{fuzz}) catch |err| { fatal("unable to spawn coverage thread: {t}", .{err});
fuzz.wait_group.finish();
fatal("unable to spawn coverage thread: {s}", .{@errorName(err)});
};
} }
for (fuzz.run_steps) |run| { for (fuzz.run_steps) |run| {
for (run.fuzz_tests.items) |unit_test_index| { for (run.fuzz_tests.items) |unit_test_index| {
assert(run.rebuilt_executable != null); assert(run.rebuilt_executable != null);
fuzz.thread_pool.spawnWg(&fuzz.wait_group, fuzzWorkerRun, .{ fuzz.wait_group.async(io, fuzzWorkerRun, .{ fuzz, run, unit_test_index });
fuzz, run, unit_test_index,
});
} }
} }
} }
pub fn deinit(fuzz: *Fuzz) void { pub fn deinit(fuzz: *Fuzz) void {
if (!fuzz.wait_group.isDone()) @panic("TODO: terminate the fuzzer processes"); const io = fuzz.io;
fuzz.wait_group.cancel(io);
fuzz.prog_node.end(); fuzz.prog_node.end();
fuzz.gpa.free(fuzz.run_steps); fuzz.gpa.free(fuzz.run_steps);
} }
@ -490,8 +484,8 @@ pub fn waitAndPrintReport(fuzz: *Fuzz) void {
assert(fuzz.mode == .limit); assert(fuzz.mode == .limit);
const io = fuzz.io; const io = fuzz.io;
fuzz.wait_group.wait(); fuzz.wait_group.wait(io);
fuzz.wait_group.reset(); fuzz.wait_group = .init;
std.debug.print("======= FUZZING REPORT =======\n", .{}); std.debug.print("======= FUZZING REPORT =======\n", .{});
for (fuzz.msg_queue.items) |msg| { for (fuzz.msg_queue.items) |msg| {

View file

@ -110,7 +110,6 @@ pub const TestResults = struct {
pub const MakeOptions = struct { pub const MakeOptions = struct {
progress_node: std.Progress.Node, progress_node: std.Progress.Node,
thread_pool: *std.Thread.Pool,
watch: bool, watch: bool,
web_server: switch (builtin.target.cpu.arch) { web_server: switch (builtin.target.cpu.arch) {
else => ?*Build.WebServer, else => ?*Build.WebServer,

View file

@ -1122,7 +1122,6 @@ pub fn rerunInFuzzMode(
const tmp_dir_path = "tmp" ++ fs.path.sep_str ++ std.fmt.hex(rand_int); const tmp_dir_path = "tmp" ++ fs.path.sep_str ++ std.fmt.hex(rand_int);
try runCommand(run, argv_list.items, has_side_effects, tmp_dir_path, .{ try runCommand(run, argv_list.items, has_side_effects, tmp_dir_path, .{
.progress_node = prog_node, .progress_node = prog_node,
.thread_pool = undefined, // not used by `runCommand`
.watch = undefined, // not used by `runCommand` .watch = undefined, // not used by `runCommand`
.web_server = null, // only needed for time reports .web_server = null, // only needed for time reports
.unit_test_timeout_ns = null, // don't time out fuzz tests for now .unit_test_timeout_ns = null, // don't time out fuzz tests for now

View file

@ -1,5 +1,4 @@
gpa: Allocator, gpa: Allocator,
thread_pool: *std.Thread.Pool,
graph: *const Build.Graph, graph: *const Build.Graph,
all_steps: []const *Build.Step, all_steps: []const *Build.Step,
listen_address: net.IpAddress, listen_address: net.IpAddress,
@ -53,7 +52,6 @@ pub fn notifyUpdate(ws: *WebServer) void {
pub const Options = struct { pub const Options = struct {
gpa: Allocator, gpa: Allocator,
thread_pool: *std.Thread.Pool,
graph: *const std.Build.Graph, graph: *const std.Build.Graph,
all_steps: []const *Build.Step, all_steps: []const *Build.Step,
ttyconf: Io.tty.Config, ttyconf: Io.tty.Config,
@ -100,7 +98,6 @@ pub fn init(opts: Options) WebServer {
return .{ return .{
.gpa = opts.gpa, .gpa = opts.gpa,
.thread_pool = opts.thread_pool,
.graph = opts.graph, .graph = opts.graph,
.all_steps = all_steps, .all_steps = all_steps,
.listen_address = opts.listen_address, .listen_address = opts.listen_address,
@ -235,7 +232,6 @@ pub fn finishBuild(ws: *WebServer, opts: struct {
ws.fuzz = Fuzz.init( ws.fuzz = Fuzz.init(
ws.gpa, ws.gpa,
ws.graph.io, ws.graph.io,
ws.thread_pool,
ws.all_steps, ws.all_steps,
ws.root_prog_node, ws.root_prog_node,
ws.ttyconf, ws.ttyconf,

View file

@ -653,6 +653,16 @@ pub const VTable = struct {
context_alignment: std.mem.Alignment, context_alignment: std.mem.Alignment,
start: *const fn (*Group, context: *const anyopaque) void, start: *const fn (*Group, context: *const anyopaque) void,
) void, ) void,
groupConcurrent: *const fn (
/// Corresponds to `Io.userdata`.
userdata: ?*anyopaque,
/// Owner of the spawned async task.
group: *Group,
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (*Group, context: *const anyopaque) void,
) ConcurrentError!void,
groupWait: *const fn (?*anyopaque, *Group, token: *anyopaque) void, groupWait: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void, groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
@ -1037,6 +1047,32 @@ pub const Group = struct {
io.vtable.groupAsync(io.userdata, g, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start); io.vtable.groupAsync(io.userdata, g, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start);
} }
pub fn concurrent(
g: *Group,
io: Io,
function: anytype,
args: std.meta.ArgsTuple(@TypeOf(function)),
) ConcurrentError!void {
const Args = @TypeOf(args);
const TypeErased = struct {
fn start(group: *Group, context: *const anyopaque) void {
_ = group;
const args_casted: *const Args = @ptrCast(@alignCast(context));
@call(.auto, function, args_casted.*);
}
};
return io.vtable.groupConcurrent(io.userdata, g, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start);
}
pub fn eager(
g: *Group,
io: Io,
function: anytype,
args: std.meta.ArgsTuple(@TypeOf(function)),
) void {
return Group.concurrent(g, io, function, args) catch @call(.auto, function, args);
}
/// Blocks until all tasks of the group finish. During this time, /// Blocks until all tasks of the group finish. During this time,
/// cancellation requests propagate to all members of the group. /// cancellation requests propagate to all members of the group.
/// ///

View file

@ -13,6 +13,7 @@ const net = std.Io.net;
const HostName = std.Io.net.HostName; const HostName = std.Io.net.HostName;
const IpAddress = std.Io.net.IpAddress; const IpAddress = std.Io.net.IpAddress;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const Alignment = std.mem.Alignment;
const assert = std.debug.assert; const assert = std.debug.assert;
const posix = std.posix; const posix = std.posix;
@ -24,7 +25,8 @@ run_queue: std.SinglyLinkedList = .{},
join_requested: bool = false, join_requested: bool = false,
threads: std.ArrayListUnmanaged(std.Thread), threads: std.ArrayListUnmanaged(std.Thread),
stack_size: usize, stack_size: usize,
cpu_count: std.Thread.CpuCountError!usize, thread_capacity: std.atomic.Value(ThreadCapacity),
thread_capacity_error: ?std.Thread.CpuCountError,
concurrent_count: usize, concurrent_count: usize,
wsa: if (is_windows) Wsa else struct {} = .{}, wsa: if (is_windows) Wsa else struct {} = .{},
@ -33,6 +35,21 @@ have_signal_handler: bool,
old_sig_io: if (have_sig_io) posix.Sigaction else void, old_sig_io: if (have_sig_io) posix.Sigaction else void,
old_sig_pipe: if (have_sig_pipe) posix.Sigaction else void, old_sig_pipe: if (have_sig_pipe) posix.Sigaction else void,
pub const ThreadCapacity = enum(usize) {
unknown = 0,
_,
pub fn init(n: usize) ThreadCapacity {
assert(n != 0);
return @enumFromInt(n);
}
pub fn get(tc: ThreadCapacity) ?usize {
if (tc == .unknown) return null;
return @intFromEnum(tc);
}
};
threadlocal var current_closure: ?*Closure = null; threadlocal var current_closure: ?*Closure = null;
const max_iovecs_len = 8; const max_iovecs_len = 8;
@ -103,18 +120,21 @@ pub fn init(
/// here. /// here.
gpa: Allocator, gpa: Allocator,
) Threaded { ) Threaded {
const cpu_count = std.Thread.getCpuCount();
var t: Threaded = .{ var t: Threaded = .{
.allocator = gpa, .allocator = gpa,
.threads = .empty, .threads = .empty,
.stack_size = std.Thread.SpawnConfig.default_stack_size, .stack_size = std.Thread.SpawnConfig.default_stack_size,
.cpu_count = std.Thread.getCpuCount(), .thread_capacity = .init(if (cpu_count) |n| .init(n) else |_| .unknown),
.thread_capacity_error = if (cpu_count) |_| null else |e| e,
.concurrent_count = 0, .concurrent_count = 0,
.old_sig_io = undefined, .old_sig_io = undefined,
.old_sig_pipe = undefined, .old_sig_pipe = undefined,
.have_signal_handler = false, .have_signal_handler = false,
}; };
if (t.cpu_count) |n| { if (cpu_count) |n| {
t.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {}; t.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {};
} else |_| {} } else |_| {}
@ -144,7 +164,8 @@ pub const init_single_threaded: Threaded = .{
.allocator = .failing, .allocator = .failing,
.threads = .empty, .threads = .empty,
.stack_size = std.Thread.SpawnConfig.default_stack_size, .stack_size = std.Thread.SpawnConfig.default_stack_size,
.cpu_count = 1, .thread_capacity = .init(.init(1)),
.thread_capacity_error = null,
.concurrent_count = 0, .concurrent_count = 0,
.old_sig_io = undefined, .old_sig_io = undefined,
.old_sig_pipe = undefined, .old_sig_pipe = undefined,
@ -165,6 +186,18 @@ pub fn deinit(t: *Threaded) void {
t.* = undefined; t.* = undefined;
} }
pub fn setThreadCapacity(t: *Threaded, n: usize) void {
t.thread_capacity.store(.init(n), .monotonic);
}
pub fn getThreadCapacity(t: *Threaded) ?usize {
return t.thread_capacity.load(.monotonic).get();
}
pub fn getCurrentThreadId() usize {
@panic("TODO");
}
fn join(t: *Threaded) void { fn join(t: *Threaded) void {
if (builtin.single_threaded) return; if (builtin.single_threaded) return;
{ {
@ -208,6 +241,7 @@ pub fn io(t: *Threaded) Io {
.select = select, .select = select,
.groupAsync = groupAsync, .groupAsync = groupAsync,
.groupConcurrent = groupConcurrent,
.groupWait = groupWait, .groupWait = groupWait,
.groupCancel = groupCancel, .groupCancel = groupCancel,
@ -304,6 +338,7 @@ pub fn ioBasic(t: *Threaded) Io {
.select = select, .select = select,
.groupAsync = groupAsync, .groupAsync = groupAsync,
.groupConcurrent = groupConcurrent,
.groupWait = groupWait, .groupWait = groupWait,
.groupCancel = groupCancel, .groupCancel = groupCancel,
@ -387,7 +422,7 @@ const AsyncClosure = struct {
func: *const fn (context: *anyopaque, result: *anyopaque) void, func: *const fn (context: *anyopaque, result: *anyopaque) void,
reset_event: ResetEvent, reset_event: ResetEvent,
select_condition: ?*ResetEvent, select_condition: ?*ResetEvent,
context_alignment: std.mem.Alignment, context_alignment: Alignment,
result_offset: usize, result_offset: usize,
const done_reset_event: *ResetEvent = @ptrFromInt(@alignOf(ResetEvent)); const done_reset_event: *ResetEvent = @ptrFromInt(@alignOf(ResetEvent));
@ -443,9 +478,9 @@ const AsyncClosure = struct {
fn async( fn async(
userdata: ?*anyopaque, userdata: ?*anyopaque,
result: []u8, result: []u8,
result_alignment: std.mem.Alignment, result_alignment: Alignment,
context: []const u8, context: []const u8,
context_alignment: std.mem.Alignment, context_alignment: Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void, start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*Io.AnyFuture { ) ?*Io.AnyFuture {
if (builtin.single_threaded) { if (builtin.single_threaded) {
@ -453,7 +488,7 @@ fn async(
return null; return null;
} }
const t: *Threaded = @ptrCast(@alignCast(userdata)); const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.cpu_count catch { const cpu_count = t.getThreadCapacity() orelse {
return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch { return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
start(context.ptr, result.ptr); start(context.ptr, result.ptr);
return null; return null;
@ -521,15 +556,15 @@ fn async(
fn concurrent( fn concurrent(
userdata: ?*anyopaque, userdata: ?*anyopaque,
result_len: usize, result_len: usize,
result_alignment: std.mem.Alignment, result_alignment: Alignment,
context: []const u8, context: []const u8,
context_alignment: std.mem.Alignment, context_alignment: Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void, start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) Io.ConcurrentError!*Io.AnyFuture { ) Io.ConcurrentError!*Io.AnyFuture {
if (builtin.single_threaded) return error.ConcurrencyUnavailable; if (builtin.single_threaded) return error.ConcurrencyUnavailable;
const t: *Threaded = @ptrCast(@alignCast(userdata)); const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.cpu_count catch 1; const cpu_count = t.getThreadCapacity() orelse 1;
const gpa = t.allocator; const gpa = t.allocator;
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure)); const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len); const result_offset = result_alignment.forward(context_offset + context.len);
@ -587,7 +622,7 @@ const GroupClosure = struct {
/// Points to sibling `GroupClosure`. Used for walking the group to cancel all. /// Points to sibling `GroupClosure`. Used for walking the group to cancel all.
node: std.SinglyLinkedList.Node, node: std.SinglyLinkedList.Node,
func: *const fn (*Io.Group, context: *anyopaque) void, func: *const fn (*Io.Group, context: *anyopaque) void,
context_alignment: std.mem.Alignment, context_alignment: Alignment,
context_len: usize, context_len: usize,
fn start(closure: *Closure) void { fn start(closure: *Closure) void {
@ -621,11 +656,11 @@ const GroupClosure = struct {
gpa.free(base[0..contextEnd(gc.context_alignment, gc.context_len)]); gpa.free(base[0..contextEnd(gc.context_alignment, gc.context_len)]);
} }
fn contextOffset(context_alignment: std.mem.Alignment) usize { fn contextOffset(context_alignment: Alignment) usize {
return context_alignment.forward(@sizeOf(GroupClosure)); return context_alignment.forward(@sizeOf(GroupClosure));
} }
fn contextEnd(context_alignment: std.mem.Alignment, context_len: usize) usize { fn contextEnd(context_alignment: Alignment, context_len: usize) usize {
return contextOffset(context_alignment) + context_len; return contextOffset(context_alignment) + context_len;
} }
@ -642,12 +677,12 @@ fn groupAsync(
userdata: ?*anyopaque, userdata: ?*anyopaque,
group: *Io.Group, group: *Io.Group,
context: []const u8, context: []const u8,
context_alignment: std.mem.Alignment, context_alignment: Alignment,
start: *const fn (*Io.Group, context: *const anyopaque) void, start: *const fn (*Io.Group, context: *const anyopaque) void,
) void { ) void {
if (builtin.single_threaded) return start(group, context.ptr); if (builtin.single_threaded) return start(group, context.ptr);
const t: *Threaded = @ptrCast(@alignCast(userdata)); const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.cpu_count catch 1; const cpu_count = t.getThreadCapacity() orelse 1;
const gpa = t.allocator; const gpa = t.allocator;
const n = GroupClosure.contextEnd(context_alignment, context.len); const n = GroupClosure.contextEnd(context_alignment, context.len);
const gc: *GroupClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(GroupClosure), n) catch { const gc: *GroupClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(GroupClosure), n) catch {
@ -704,6 +739,73 @@ fn groupAsync(
t.cond.signal(); t.cond.signal();
} }
fn groupConcurrent(
userdata: ?*anyopaque,
group: *Io.Group,
context: []const u8,
context_alignment: Alignment,
start: *const fn (*Io.Group, context: *const anyopaque) void,
) Io.ConcurrentError!void {
if (builtin.single_threaded) return error.ConcurrencyUnavailable;
const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.getThreadCapacity() orelse 1;
const gpa = t.allocator;
const n = GroupClosure.contextEnd(context_alignment, context.len);
const gc_bytes = gpa.alignedAlloc(u8, .of(GroupClosure), n) catch
return error.ConcurrencyUnavailable;
const gc: *GroupClosure = @ptrCast(@alignCast(gc_bytes));
gc.* = .{
.closure = .{
.cancel_tid = .none,
.start = GroupClosure.start,
.is_concurrent = false,
},
.t = t,
.group = group,
.node = undefined,
.func = start,
.context_alignment = context_alignment,
.context_len = context.len,
};
@memcpy(gc.contextPointer()[0..context.len], context);
t.mutex.lock();
// Append to the group linked list inside the mutex to make `Io.Group.concurrent` thread-safe.
gc.node = .{ .next = @ptrCast(@alignCast(group.token)) };
group.token = &gc.node;
t.concurrent_count += 1;
const thread_capacity = cpu_count - 1 + t.concurrent_count;
t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
t.mutex.unlock();
gc.free(gpa);
return error.ConcurrencyUnavailable;
};
t.run_queue.prepend(&gc.closure.node);
if (t.threads.items.len < thread_capacity) {
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
assert(t.run_queue.popFirst() == &gc.closure.node);
t.mutex.unlock();
gc.free(gpa);
return error.ConcurrencyUnavailable;
};
t.threads.appendAssumeCapacity(thread);
}
// This needs to be done before unlocking the mutex to avoid a race with
// the associated task finishing.
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
const prev_state = group_state.fetchAdd(GroupClosure.sync_one_pending, .monotonic);
assert((prev_state / GroupClosure.sync_one_pending) < (std.math.maxInt(usize) / GroupClosure.sync_one_pending));
t.mutex.unlock();
t.cond.signal();
}
fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void { fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
const t: *Threaded = @ptrCast(@alignCast(userdata)); const t: *Threaded = @ptrCast(@alignCast(userdata));
const gpa = t.allocator; const gpa = t.allocator;
@ -771,7 +873,7 @@ fn await(
userdata: ?*anyopaque, userdata: ?*anyopaque,
any_future: *Io.AnyFuture, any_future: *Io.AnyFuture,
result: []u8, result: []u8,
result_alignment: std.mem.Alignment, result_alignment: Alignment,
) void { ) void {
_ = result_alignment; _ = result_alignment;
const t: *Threaded = @ptrCast(@alignCast(userdata)); const t: *Threaded = @ptrCast(@alignCast(userdata));
@ -783,7 +885,7 @@ fn cancel(
userdata: ?*anyopaque, userdata: ?*anyopaque,
any_future: *Io.AnyFuture, any_future: *Io.AnyFuture,
result: []u8, result: []u8,
result_alignment: std.mem.Alignment, result_alignment: Alignment,
) void { ) void {
_ = result_alignment; _ = result_alignment;
const t: *Threaded = @ptrCast(@alignCast(userdata)); const t: *Threaded = @ptrCast(@alignCast(userdata));

View file

@ -17,8 +17,6 @@ pub const Mutex = @import("Thread/Mutex.zig");
pub const Semaphore = @import("Thread/Semaphore.zig"); pub const Semaphore = @import("Thread/Semaphore.zig");
pub const Condition = @import("Thread/Condition.zig"); pub const Condition = @import("Thread/Condition.zig");
pub const RwLock = @import("Thread/RwLock.zig"); pub const RwLock = @import("Thread/RwLock.zig");
pub const Pool = @import("Thread/Pool.zig");
pub const WaitGroup = @import("Thread/WaitGroup.zig");
pub const use_pthreads = native_os != .windows and native_os != .wasi and builtin.link_libc; pub const use_pthreads = native_os != .windows and native_os != .wasi and builtin.link_libc;
@ -1756,7 +1754,6 @@ test {
_ = Semaphore; _ = Semaphore;
_ = Condition; _ = Condition;
_ = RwLock; _ = RwLock;
_ = Pool;
} }
fn testIncrementNotify(value: *usize, event: *ResetEvent) void { fn testIncrementNotify(value: *usize, event: *ResetEvent) void {

View file

@ -1,326 +0,0 @@
const std = @import("std");
const builtin = @import("builtin");
const Pool = @This();
const WaitGroup = @import("WaitGroup.zig");
mutex: std.Thread.Mutex = .{},
cond: std.Thread.Condition = .{},
run_queue: std.SinglyLinkedList = .{},
is_running: bool = true,
allocator: std.mem.Allocator,
threads: if (builtin.single_threaded) [0]std.Thread else []std.Thread,
ids: if (builtin.single_threaded) struct {
inline fn deinit(_: @This(), _: std.mem.Allocator) void {}
fn getIndex(_: @This(), _: std.Thread.Id) usize {
return 0;
}
} else std.AutoArrayHashMapUnmanaged(std.Thread.Id, void),
const Runnable = struct {
runFn: RunProto,
node: std.SinglyLinkedList.Node = .{},
};
const RunProto = *const fn (*Runnable, id: ?usize) void;
pub const Options = struct {
allocator: std.mem.Allocator,
n_jobs: ?usize = null,
track_ids: bool = false,
stack_size: usize = std.Thread.SpawnConfig.default_stack_size,
};
pub fn init(pool: *Pool, options: Options) !void {
const allocator = options.allocator;
pool.* = .{
.allocator = allocator,
.threads = if (builtin.single_threaded) .{} else &.{},
.ids = .{},
};
if (builtin.single_threaded) {
return;
}
const thread_count = options.n_jobs orelse @max(1, std.Thread.getCpuCount() catch 1);
if (options.track_ids) {
try pool.ids.ensureTotalCapacity(allocator, 1 + thread_count);
pool.ids.putAssumeCapacityNoClobber(std.Thread.getCurrentId(), {});
}
// kill and join any threads we spawned and free memory on error.
pool.threads = try allocator.alloc(std.Thread, thread_count);
var spawned: usize = 0;
errdefer pool.join(spawned);
for (pool.threads) |*thread| {
thread.* = try std.Thread.spawn(.{
.stack_size = options.stack_size,
.allocator = allocator,
}, worker, .{pool});
spawned += 1;
}
}
pub fn deinit(pool: *Pool) void {
pool.join(pool.threads.len); // kill and join all threads.
pool.ids.deinit(pool.allocator);
pool.* = undefined;
}
fn join(pool: *Pool, spawned: usize) void {
if (builtin.single_threaded) {
return;
}
{
pool.mutex.lock();
defer pool.mutex.unlock();
// ensure future worker threads exit the dequeue loop
pool.is_running = false;
}
// wake up any sleeping threads (this can be done outside the mutex)
// then wait for all the threads we know are spawned to complete.
pool.cond.broadcast();
for (pool.threads[0..spawned]) |thread| {
thread.join();
}
pool.allocator.free(pool.threads);
}
/// Runs `func` in the thread pool, calling `WaitGroup.start` beforehand, and
/// `WaitGroup.finish` after it returns.
///
/// In the case that queuing the function call fails to allocate memory, or the
/// target is single-threaded, the function is called directly.
pub fn spawnWg(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, args: anytype) void {
wait_group.start();
if (builtin.single_threaded) {
@call(.auto, func, args);
wait_group.finish();
return;
}
const Args = @TypeOf(args);
const Closure = struct {
arguments: Args,
pool: *Pool,
runnable: Runnable = .{ .runFn = runFn },
wait_group: *WaitGroup,
fn runFn(runnable: *Runnable, _: ?usize) void {
const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable));
@call(.auto, func, closure.arguments);
closure.wait_group.finish();
// The thread pool's allocator is protected by the mutex.
const mutex = &closure.pool.mutex;
mutex.lock();
defer mutex.unlock();
closure.pool.allocator.destroy(closure);
}
};
{
pool.mutex.lock();
const closure = pool.allocator.create(Closure) catch {
pool.mutex.unlock();
@call(.auto, func, args);
wait_group.finish();
return;
};
closure.* = .{
.arguments = args,
.pool = pool,
.wait_group = wait_group,
};
pool.run_queue.prepend(&closure.runnable.node);
pool.mutex.unlock();
}
// Notify waiting threads outside the lock to try and keep the critical section small.
pool.cond.signal();
}
/// Runs `func` in the thread pool, calling `WaitGroup.start` beforehand, and
/// `WaitGroup.finish` after it returns.
///
/// The first argument passed to `func` is a dense `usize` thread id, the rest
/// of the arguments are passed from `args`. Requires the pool to have been
/// initialized with `.track_ids = true`.
///
/// In the case that queuing the function call fails to allocate memory, or the
/// target is single-threaded, the function is called directly.
pub fn spawnWgId(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, args: anytype) void {
wait_group.start();
if (builtin.single_threaded) {
@call(.auto, func, .{0} ++ args);
wait_group.finish();
return;
}
const Args = @TypeOf(args);
const Closure = struct {
arguments: Args,
pool: *Pool,
runnable: Runnable = .{ .runFn = runFn },
wait_group: *WaitGroup,
fn runFn(runnable: *Runnable, id: ?usize) void {
const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable));
@call(.auto, func, .{id.?} ++ closure.arguments);
closure.wait_group.finish();
// The thread pool's allocator is protected by the mutex.
const mutex = &closure.pool.mutex;
mutex.lock();
defer mutex.unlock();
closure.pool.allocator.destroy(closure);
}
};
{
pool.mutex.lock();
const closure = pool.allocator.create(Closure) catch {
const id: ?usize = pool.ids.getIndex(std.Thread.getCurrentId());
pool.mutex.unlock();
@call(.auto, func, .{id.?} ++ args);
wait_group.finish();
return;
};
closure.* = .{
.arguments = args,
.pool = pool,
.wait_group = wait_group,
};
pool.run_queue.prepend(&closure.runnable.node);
pool.mutex.unlock();
}
// Notify waiting threads outside the lock to try and keep the critical section small.
pool.cond.signal();
}
pub fn spawn(pool: *Pool, comptime func: anytype, args: anytype) !void {
if (builtin.single_threaded) {
@call(.auto, func, args);
return;
}
const Args = @TypeOf(args);
const Closure = struct {
arguments: Args,
pool: *Pool,
runnable: Runnable = .{ .runFn = runFn },
fn runFn(runnable: *Runnable, _: ?usize) void {
const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable));
@call(.auto, func, closure.arguments);
// The thread pool's allocator is protected by the mutex.
const mutex = &closure.pool.mutex;
mutex.lock();
defer mutex.unlock();
closure.pool.allocator.destroy(closure);
}
};
{
pool.mutex.lock();
defer pool.mutex.unlock();
const closure = try pool.allocator.create(Closure);
closure.* = .{
.arguments = args,
.pool = pool,
};
pool.run_queue.prepend(&closure.runnable.node);
}
// Notify waiting threads outside the lock to try and keep the critical section small.
pool.cond.signal();
}
test spawn {
const TestFn = struct {
fn checkRun(completed: *bool) void {
completed.* = true;
}
};
var completed: bool = false;
{
var pool: Pool = undefined;
try pool.init(.{
.allocator = std.testing.allocator,
});
defer pool.deinit();
try pool.spawn(TestFn.checkRun, .{&completed});
}
try std.testing.expectEqual(true, completed);
}
fn worker(pool: *Pool) void {
pool.mutex.lock();
defer pool.mutex.unlock();
const id: ?usize = if (pool.ids.count() > 0) @intCast(pool.ids.count()) else null;
if (id) |_| pool.ids.putAssumeCapacityNoClobber(std.Thread.getCurrentId(), {});
while (true) {
while (pool.run_queue.popFirst()) |run_node| {
// Temporarily unlock the mutex in order to execute the run_node
pool.mutex.unlock();
defer pool.mutex.lock();
const runnable: *Runnable = @fieldParentPtr("node", run_node);
runnable.runFn(runnable, id);
}
// Stop executing instead of waiting if the thread pool is no longer running.
if (pool.is_running) {
pool.cond.wait(&pool.mutex);
} else {
break;
}
}
}
pub fn waitAndWork(pool: *Pool, wait_group: *WaitGroup) void {
var id: ?usize = null;
while (!wait_group.isDone()) {
pool.mutex.lock();
if (pool.run_queue.popFirst()) |run_node| {
id = id orelse pool.ids.getIndex(std.Thread.getCurrentId());
pool.mutex.unlock();
const runnable: *Runnable = @fieldParentPtr("node", run_node);
runnable.runFn(runnable, id);
continue;
}
pool.mutex.unlock();
wait_group.wait();
return;
}
}
pub fn getIdCount(pool: *Pool) usize {
return @intCast(1 + pool.threads.len);
}

View file

@ -1,83 +0,0 @@
const builtin = @import("builtin");
const std = @import("std");
const assert = std.debug.assert;
const WaitGroup = @This();
const is_waiting: usize = 1 << 0;
const one_pending: usize = 1 << 1;
state: std.atomic.Value(usize) = std.atomic.Value(usize).init(0),
event: std.Thread.ResetEvent = .unset,
pub fn start(self: *WaitGroup) void {
return startStateless(&self.state);
}
pub fn startStateless(state: *std.atomic.Value(usize)) void {
const prev_state = state.fetchAdd(one_pending, .monotonic);
assert((prev_state / one_pending) < (std.math.maxInt(usize) / one_pending));
}
pub fn startMany(self: *WaitGroup, n: usize) void {
const state = self.state.fetchAdd(one_pending * n, .monotonic);
assert((state / one_pending) < (std.math.maxInt(usize) / one_pending));
}
pub fn finish(self: *WaitGroup) void {
const state = self.state.fetchSub(one_pending, .acq_rel);
assert((state / one_pending) > 0);
if (state == (one_pending | is_waiting)) {
self.event.set();
}
}
pub fn finishStateless(state: *std.atomic.Value(usize), event: *std.Thread.ResetEvent) void {
const prev_state = state.fetchSub(one_pending, .acq_rel);
assert((prev_state / one_pending) > 0);
if (prev_state == (one_pending | is_waiting)) event.set();
}
pub fn wait(wg: *WaitGroup) void {
return waitStateless(&wg.state, &wg.event);
}
pub fn waitStateless(state: *std.atomic.Value(usize), event: *std.Thread.ResetEvent) void {
const prev_state = state.fetchAdd(is_waiting, .acquire);
assert(prev_state & is_waiting == 0);
if ((prev_state / one_pending) > 0) event.wait();
}
pub fn reset(self: *WaitGroup) void {
self.state.store(0, .monotonic);
self.event.reset();
}
pub fn isDone(wg: *WaitGroup) bool {
const state = wg.state.load(.acquire);
assert(state & is_waiting == 0);
return (state / one_pending) == 0;
}
// Spawns a new thread for the task. This is appropriate when the callee
// delegates all work.
pub fn spawnManager(
wg: *WaitGroup,
comptime func: anytype,
args: anytype,
) void {
if (builtin.single_threaded) {
@call(.auto, func, args);
return;
}
const Manager = struct {
fn run(wg_inner: *WaitGroup, args_inner: @TypeOf(args)) void {
defer wg_inner.finish();
@call(.auto, func, args_inner);
}
};
wg.start();
const t = std.Thread.spawn(.{}, Manager.run, .{ wg, args }) catch return Manager.run(wg, args);
t.detach();
}

View file

@ -10,8 +10,6 @@ const Allocator = std.mem.Allocator;
const assert = std.debug.assert; const assert = std.debug.assert;
const log = std.log.scoped(.compilation); const log = std.log.scoped(.compilation);
const Target = std.Target; const Target = std.Target;
const ThreadPool = std.Thread.Pool;
const WaitGroup = std.Thread.WaitGroup;
const ErrorBundle = std.zig.ErrorBundle; const ErrorBundle = std.zig.ErrorBundle;
const fatal = std.process.fatal; const fatal = std.process.fatal;
@ -197,7 +195,6 @@ libc_include_dir_list: []const []const u8,
libc_framework_dir_list: []const []const u8, libc_framework_dir_list: []const []const u8,
rc_includes: RcIncludes, rc_includes: RcIncludes,
mingw_unicode_entry_point: bool, mingw_unicode_entry_point: bool,
thread_pool: *ThreadPool,
/// Populated when we build the libc++ static library. A Job to build this is placed in the queue /// Populated when we build the libc++ static library. A Job to build this is placed in the queue
/// and resolved before calling linker.flush(). /// and resolved before calling linker.flush().
@ -252,11 +249,11 @@ mutex: if (builtin.single_threaded) struct {
pub inline fn tryLock(_: @This()) void {} pub inline fn tryLock(_: @This()) void {}
pub inline fn lock(_: @This()) void {} pub inline fn lock(_: @This()) void {}
pub inline fn unlock(_: @This()) void {} pub inline fn unlock(_: @This()) void {}
} else std.Thread.Mutex = .{}, } else Io.Mutex = .init,
test_filters: []const []const u8, test_filters: []const []const u8,
link_task_wait_group: WaitGroup = .{}, link_task_wait_group: Io.Group = .init,
link_prog_node: std.Progress.Node = .none, link_prog_node: std.Progress.Node = .none,
link_const_prog_node: std.Progress.Node = .none, link_const_prog_node: std.Progress.Node = .none,
link_synth_prog_node: std.Progress.Node = .none, link_synth_prog_node: std.Progress.Node = .none,
@ -1579,7 +1576,7 @@ pub const CacheMode = enum {
pub const ParentWholeCache = struct { pub const ParentWholeCache = struct {
manifest: *Cache.Manifest, manifest: *Cache.Manifest,
mutex: *std.Thread.Mutex, mutex: *Io.Mutex,
prefix_map: [4]u8, prefix_map: [4]u8,
}; };
@ -1607,7 +1604,7 @@ const CacheUse = union(CacheMode) {
lf_open_opts: link.File.OpenOptions, lf_open_opts: link.File.OpenOptions,
/// This is a pointer to a local variable inside `update`. /// This is a pointer to a local variable inside `update`.
cache_manifest: ?*Cache.Manifest, cache_manifest: ?*Cache.Manifest,
cache_manifest_mutex: std.Thread.Mutex, cache_manifest_mutex: Io.Mutex,
/// This is non-`null` for most of the body of `update`. It is the temporary directory which /// This is non-`null` for most of the body of `update`. It is the temporary directory which
/// we initially emit our artifacts to. After the main part of the update is done, it will /// we initially emit our artifacts to. After the main part of the update is done, it will
/// be closed and moved to its final location, and this field set to `null`. /// be closed and moved to its final location, and this field set to `null`.
@ -1647,7 +1644,6 @@ const CacheUse = union(CacheMode) {
pub const CreateOptions = struct { pub const CreateOptions = struct {
dirs: Directories, dirs: Directories,
thread_pool: *ThreadPool,
self_exe_path: ?[]const u8 = null, self_exe_path: ?[]const u8 = null,
/// Options that have been resolved by calling `resolveDefaults`. /// Options that have been resolved by calling `resolveDefaults`.
@ -2223,7 +2219,7 @@ pub fn create(gpa: Allocator, arena: Allocator, io: Io, diag: *CreateDiagnostic,
.analysis_roots_buffer = undefined, .analysis_roots_buffer = undefined,
.analysis_roots_len = 0, .analysis_roots_len = 0,
}; };
try zcu.init(options.thread_pool.getIdCount()); try zcu.init((@import("main.zig").threaded_io.getThreadCapacity() orelse 1) + 1);
break :blk zcu; break :blk zcu;
} else blk: { } else blk: {
if (options.emit_h != .no) return diag.fail(.emit_h_without_zcu); if (options.emit_h != .no) return diag.fail(.emit_h_without_zcu);
@ -2252,7 +2248,6 @@ pub fn create(gpa: Allocator, arena: Allocator, io: Io, diag: *CreateDiagnostic,
.libc_framework_dir_list = libc_dirs.libc_framework_dir_list, .libc_framework_dir_list = libc_dirs.libc_framework_dir_list,
.rc_includes = options.rc_includes, .rc_includes = options.rc_includes,
.mingw_unicode_entry_point = options.mingw_unicode_entry_point, .mingw_unicode_entry_point = options.mingw_unicode_entry_point,
.thread_pool = options.thread_pool,
.clang_passthrough_mode = options.clang_passthrough_mode, .clang_passthrough_mode = options.clang_passthrough_mode,
.clang_preprocessor_mode = options.clang_preprocessor_mode, .clang_preprocessor_mode = options.clang_preprocessor_mode,
.verbose_cc = options.verbose_cc, .verbose_cc = options.verbose_cc,
@ -2478,7 +2473,7 @@ pub fn create(gpa: Allocator, arena: Allocator, io: Io, diag: *CreateDiagnostic,
whole.* = .{ whole.* = .{
.lf_open_opts = lf_open_opts, .lf_open_opts = lf_open_opts,
.cache_manifest = null, .cache_manifest = null,
.cache_manifest_mutex = .{}, .cache_manifest_mutex = .init,
.tmp_artifact_directory = null, .tmp_artifact_directory = null,
.lock = null, .lock = null,
}; };
@ -2874,8 +2869,10 @@ pub fn update(comp: *Compilation, main_progress_node: std.Progress.Node) UpdateE
const tracy_trace = trace(@src()); const tracy_trace = trace(@src());
defer tracy_trace.end(); defer tracy_trace.end();
// This arena is scoped to this one update.
const gpa = comp.gpa; const gpa = comp.gpa;
const io = comp.io;
// This arena is scoped to this one update.
var arena_allocator = std.heap.ArenaAllocator.init(gpa); var arena_allocator = std.heap.ArenaAllocator.init(gpa);
defer arena_allocator.deinit(); defer arena_allocator.deinit();
const arena = arena_allocator.allocator(); const arena = arena_allocator.allocator();
@ -2954,8 +2951,8 @@ pub fn update(comp: *Compilation, main_progress_node: std.Progress.Node) UpdateE
// In this case the cache hit contains the full set of file system inputs. Nice! // In this case the cache hit contains the full set of file system inputs. Nice!
if (comp.file_system_inputs) |buf| try man.populateFileSystemInputs(buf); if (comp.file_system_inputs) |buf| try man.populateFileSystemInputs(buf);
if (comp.parent_whole_cache) |pwc| { if (comp.parent_whole_cache) |pwc| {
pwc.mutex.lock(); pwc.mutex.lockUncancelable(io);
defer pwc.mutex.unlock(); defer pwc.mutex.unlock(io);
try man.populateOtherManifest(pwc.manifest, pwc.prefix_map); try man.populateOtherManifest(pwc.manifest, pwc.prefix_map);
} }
@ -3153,8 +3150,8 @@ pub fn update(comp: *Compilation, main_progress_node: std.Progress.Node) UpdateE
.whole => |whole| { .whole => |whole| {
if (comp.file_system_inputs) |buf| try man.populateFileSystemInputs(buf); if (comp.file_system_inputs) |buf| try man.populateFileSystemInputs(buf);
if (comp.parent_whole_cache) |pwc| { if (comp.parent_whole_cache) |pwc| {
pwc.mutex.lock(); pwc.mutex.lockUncancelable(io);
defer pwc.mutex.unlock(); defer pwc.mutex.unlock(io);
try man.populateOtherManifest(pwc.manifest, pwc.prefix_map); try man.populateOtherManifest(pwc.manifest, pwc.prefix_map);
} }
@ -3321,6 +3318,7 @@ fn flush(
arena: Allocator, arena: Allocator,
tid: Zcu.PerThread.Id, tid: Zcu.PerThread.Id,
) Allocator.Error!void { ) Allocator.Error!void {
const io = comp.io;
if (comp.zcu) |zcu| { if (comp.zcu) |zcu| {
if (zcu.llvm_object) |llvm_object| { if (zcu.llvm_object) |llvm_object| {
const pt: Zcu.PerThread = .activate(zcu, tid); const pt: Zcu.PerThread = .activate(zcu, tid);
@ -3333,8 +3331,8 @@ fn flush(
var timer = comp.startTimer(); var timer = comp.startTimer();
defer if (timer.finish()) |ns| { defer if (timer.finish()) |ns| {
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
comp.time_report.?.stats.real_ns_llvm_emit = ns; comp.time_report.?.stats.real_ns_llvm_emit = ns;
}; };
@ -3378,8 +3376,8 @@ fn flush(
if (comp.bin_file) |lf| { if (comp.bin_file) |lf| {
var timer = comp.startTimer(); var timer = comp.startTimer();
defer if (timer.finish()) |ns| { defer if (timer.finish()) |ns| {
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
comp.time_report.?.stats.real_ns_link_flush = ns; comp.time_report.?.stats.real_ns_link_flush = ns;
}; };
// This is needed before reading the error flags. // This is needed before reading the error flags.
@ -4587,10 +4585,8 @@ pub fn unableToLoadZcuFile(
}); });
} }
fn performAllTheWork( fn performAllTheWork(comp: *Compilation, main_progress_node: std.Progress.Node) JobError!void {
comp: *Compilation, const io = comp.io;
main_progress_node: std.Progress.Node,
) JobError!void {
// Regardless of errors, `comp.zcu` needs to update its generation number. // Regardless of errors, `comp.zcu` needs to update its generation number.
defer if (comp.zcu) |zcu| { defer if (comp.zcu) |zcu| {
zcu.generation += 1; zcu.generation += 1;
@ -4602,8 +4598,8 @@ fn performAllTheWork(
defer commit_timer: { defer commit_timer: {
const t = &(decl_work_timer orelse break :commit_timer); const t = &(decl_work_timer orelse break :commit_timer);
const ns = t.finish() orelse break :commit_timer; const ns = t.finish() orelse break :commit_timer;
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
comp.time_report.?.stats.real_ns_decls = ns; comp.time_report.?.stats.real_ns_decls = ns;
} }
@ -4612,11 +4608,11 @@ fn performAllTheWork(
// (at least for now) single-threaded main work queue. However, C object compilation // (at least for now) single-threaded main work queue. However, C object compilation
// only needs to be finished by the end of this function. // only needs to be finished by the end of this function.
var work_queue_wait_group: WaitGroup = .{}; var work_queue_wait_group: Io.Group = .init;
defer work_queue_wait_group.wait(); defer work_queue_wait_group.wait(io);
comp.link_task_wait_group.reset(); comp.link_task_wait_group = .init;
defer comp.link_task_wait_group.wait(); defer comp.link_task_wait_group.wait(io);
// Already-queued prelink tasks // Already-queued prelink tasks
comp.link_prog_node.increaseEstimatedTotalItems(comp.link_task_queue.queued_prelink.items.len); comp.link_prog_node.increaseEstimatedTotalItems(comp.link_task_queue.queued_prelink.items.len);
@ -4624,8 +4620,8 @@ fn performAllTheWork(
if (comp.emit_docs != null) { if (comp.emit_docs != null) {
dev.check(.docs_emit); dev.check(.docs_emit);
comp.thread_pool.spawnWg(&work_queue_wait_group, workerDocsCopy, .{comp}); work_queue_wait_group.async(io, workerDocsCopy, .{comp});
work_queue_wait_group.spawnManager(workerDocsWasm, .{ comp, main_progress_node }); work_queue_wait_group.eager(io, workerDocsWasm, .{ comp, main_progress_node });
} }
// In case it failed last time, try again. `clearMiscFailures` was already // In case it failed last time, try again. `clearMiscFailures` was already
@ -4636,7 +4632,7 @@ fn performAllTheWork(
// //
// https://github.com/llvm/llvm-project/issues/43698#issuecomment-2542660611 // https://github.com/llvm/llvm-project/issues/43698#issuecomment-2542660611
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildRt, .{ comp.link_task_wait_group.eager(io, buildRt, .{
comp, comp,
"compiler_rt.zig", "compiler_rt.zig",
"compiler_rt", "compiler_rt",
@ -4654,7 +4650,7 @@ fn performAllTheWork(
if (comp.queued_jobs.compiler_rt_obj and comp.compiler_rt_obj == null) { if (comp.queued_jobs.compiler_rt_obj and comp.compiler_rt_obj == null) {
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildRt, .{ comp.link_task_wait_group.eager(io, buildRt, .{
comp, comp,
"compiler_rt.zig", "compiler_rt.zig",
"compiler_rt", "compiler_rt",
@ -4673,7 +4669,7 @@ fn performAllTheWork(
// hack for stage2_x86_64 + coff // hack for stage2_x86_64 + coff
if (comp.queued_jobs.compiler_rt_dyn_lib and comp.compiler_rt_dyn_lib == null) { if (comp.queued_jobs.compiler_rt_dyn_lib and comp.compiler_rt_dyn_lib == null) {
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildRt, .{ comp.link_task_wait_group.eager(io, buildRt, .{
comp, comp,
"compiler_rt.zig", "compiler_rt.zig",
"compiler_rt", "compiler_rt",
@ -4691,7 +4687,7 @@ fn performAllTheWork(
if (comp.queued_jobs.fuzzer_lib and comp.fuzzer_lib == null) { if (comp.queued_jobs.fuzzer_lib and comp.fuzzer_lib == null) {
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildRt, .{ comp.link_task_wait_group.eager(io, buildRt, .{
comp, comp,
"fuzzer.zig", "fuzzer.zig",
"fuzzer", "fuzzer",
@ -4706,7 +4702,7 @@ fn performAllTheWork(
if (comp.queued_jobs.ubsan_rt_lib and comp.ubsan_rt_lib == null) { if (comp.queued_jobs.ubsan_rt_lib and comp.ubsan_rt_lib == null) {
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildRt, .{ comp.link_task_wait_group.eager(io, buildRt, .{
comp, comp,
"ubsan_rt.zig", "ubsan_rt.zig",
"ubsan_rt", "ubsan_rt",
@ -4723,7 +4719,7 @@ fn performAllTheWork(
if (comp.queued_jobs.ubsan_rt_obj and comp.ubsan_rt_obj == null) { if (comp.queued_jobs.ubsan_rt_obj and comp.ubsan_rt_obj == null) {
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildRt, .{ comp.link_task_wait_group.eager(io, buildRt, .{
comp, comp,
"ubsan_rt.zig", "ubsan_rt.zig",
"ubsan_rt", "ubsan_rt",
@ -4740,49 +4736,49 @@ fn performAllTheWork(
if (comp.queued_jobs.glibc_shared_objects) { if (comp.queued_jobs.glibc_shared_objects) {
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildGlibcSharedObjects, .{ comp, main_progress_node }); comp.link_task_wait_group.eager(io, buildGlibcSharedObjects, .{ comp, main_progress_node });
} }
if (comp.queued_jobs.freebsd_shared_objects) { if (comp.queued_jobs.freebsd_shared_objects) {
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildFreeBSDSharedObjects, .{ comp, main_progress_node }); comp.link_task_wait_group.eager(io, buildFreeBSDSharedObjects, .{ comp, main_progress_node });
} }
if (comp.queued_jobs.netbsd_shared_objects) { if (comp.queued_jobs.netbsd_shared_objects) {
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildNetBSDSharedObjects, .{ comp, main_progress_node }); comp.link_task_wait_group.eager(io, buildNetBSDSharedObjects, .{ comp, main_progress_node });
} }
if (comp.queued_jobs.libunwind) { if (comp.queued_jobs.libunwind) {
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildLibUnwind, .{ comp, main_progress_node }); comp.link_task_wait_group.eager(io, buildLibUnwind, .{ comp, main_progress_node });
} }
if (comp.queued_jobs.libcxx) { if (comp.queued_jobs.libcxx) {
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildLibCxx, .{ comp, main_progress_node }); comp.link_task_wait_group.eager(io, buildLibCxx, .{ comp, main_progress_node });
} }
if (comp.queued_jobs.libcxxabi) { if (comp.queued_jobs.libcxxabi) {
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildLibCxxAbi, .{ comp, main_progress_node }); comp.link_task_wait_group.eager(io, buildLibCxxAbi, .{ comp, main_progress_node });
} }
if (comp.queued_jobs.libtsan) { if (comp.queued_jobs.libtsan) {
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildLibTsan, .{ comp, main_progress_node }); comp.link_task_wait_group.eager(io, buildLibTsan, .{ comp, main_progress_node });
} }
if (comp.queued_jobs.zigc_lib and comp.zigc_static_lib == null) { if (comp.queued_jobs.zigc_lib and comp.zigc_static_lib == null) {
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildLibZigC, .{ comp, main_progress_node }); comp.link_task_wait_group.eager(io, buildLibZigC, .{ comp, main_progress_node });
} }
for (0..@typeInfo(musl.CrtFile).@"enum".fields.len) |i| { for (0..@typeInfo(musl.CrtFile).@"enum".fields.len) |i| {
if (comp.queued_jobs.musl_crt_file[i]) { if (comp.queued_jobs.musl_crt_file[i]) {
const tag: musl.CrtFile = @enumFromInt(i); const tag: musl.CrtFile = @enumFromInt(i);
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildMuslCrtFile, .{ comp, tag, main_progress_node }); comp.link_task_wait_group.eager(io, buildMuslCrtFile, .{ comp, tag, main_progress_node });
} }
} }
@ -4790,7 +4786,7 @@ fn performAllTheWork(
if (comp.queued_jobs.glibc_crt_file[i]) { if (comp.queued_jobs.glibc_crt_file[i]) {
const tag: glibc.CrtFile = @enumFromInt(i); const tag: glibc.CrtFile = @enumFromInt(i);
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildGlibcCrtFile, .{ comp, tag, main_progress_node }); comp.link_task_wait_group.eager(io, buildGlibcCrtFile, .{ comp, tag, main_progress_node });
} }
} }
@ -4798,7 +4794,7 @@ fn performAllTheWork(
if (comp.queued_jobs.freebsd_crt_file[i]) { if (comp.queued_jobs.freebsd_crt_file[i]) {
const tag: freebsd.CrtFile = @enumFromInt(i); const tag: freebsd.CrtFile = @enumFromInt(i);
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildFreeBSDCrtFile, .{ comp, tag, main_progress_node }); comp.link_task_wait_group.eager(io, buildFreeBSDCrtFile, .{ comp, tag, main_progress_node });
} }
} }
@ -4806,7 +4802,7 @@ fn performAllTheWork(
if (comp.queued_jobs.netbsd_crt_file[i]) { if (comp.queued_jobs.netbsd_crt_file[i]) {
const tag: netbsd.CrtFile = @enumFromInt(i); const tag: netbsd.CrtFile = @enumFromInt(i);
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildNetBSDCrtFile, .{ comp, tag, main_progress_node }); comp.link_task_wait_group.eager(io, buildNetBSDCrtFile, .{ comp, tag, main_progress_node });
} }
} }
@ -4814,7 +4810,7 @@ fn performAllTheWork(
if (comp.queued_jobs.wasi_libc_crt_file[i]) { if (comp.queued_jobs.wasi_libc_crt_file[i]) {
const tag: wasi_libc.CrtFile = @enumFromInt(i); const tag: wasi_libc.CrtFile = @enumFromInt(i);
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildWasiLibcCrtFile, .{ comp, tag, main_progress_node }); comp.link_task_wait_group.eager(io, buildWasiLibcCrtFile, .{ comp, tag, main_progress_node });
} }
} }
@ -4822,7 +4818,7 @@ fn performAllTheWork(
if (comp.queued_jobs.mingw_crt_file[i]) { if (comp.queued_jobs.mingw_crt_file[i]) {
const tag: mingw.CrtFile = @enumFromInt(i); const tag: mingw.CrtFile = @enumFromInt(i);
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.link_task_wait_group.spawnManager(buildMingwCrtFile, .{ comp, tag, main_progress_node }); comp.link_task_wait_group.eager(io, buildMingwCrtFile, .{ comp, tag, main_progress_node });
} }
} }
@ -4835,13 +4831,13 @@ fn performAllTheWork(
var timer = comp.startTimer(); var timer = comp.startTimer();
defer if (timer.finish()) |ns| { defer if (timer.finish()) |ns| {
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
comp.time_report.?.stats.real_ns_files = ns; comp.time_report.?.stats.real_ns_files = ns;
}; };
var astgen_wait_group: WaitGroup = .{}; var astgen_wait_group: Io.Group = .init;
defer astgen_wait_group.wait(); defer astgen_wait_group.wait(io);
if (comp.zcu) |zcu| { if (comp.zcu) |zcu| {
const gpa = zcu.gpa; const gpa = zcu.gpa;
@ -4865,7 +4861,7 @@ fn performAllTheWork(
// sure the file contents are still correct on disk, since it can improve the // sure the file contents are still correct on disk, since it can improve the
// debugging experience better. That job only needs `file`, so we can kick it // debugging experience better. That job only needs `file`, so we can kick it
// off right now. // off right now.
comp.thread_pool.spawnWg(&astgen_wait_group, workerUpdateBuiltinFile, .{ comp, file }); astgen_wait_group.async(io, workerUpdateBuiltinFile, .{ comp, file });
continue; continue;
} }
astgen_work_items.appendAssumeCapacity(.{ astgen_work_items.appendAssumeCapacity(.{
@ -4876,7 +4872,7 @@ fn performAllTheWork(
// Now that we're not going to touch `zcu.import_table` again, we can spawn `workerUpdateFile` jobs. // Now that we're not going to touch `zcu.import_table` again, we can spawn `workerUpdateFile` jobs.
for (astgen_work_items.items(.file_index), astgen_work_items.items(.file)) |file_index, file| { for (astgen_work_items.items(.file_index), astgen_work_items.items(.file)) |file_index, file| {
comp.thread_pool.spawnWgId(&astgen_wait_group, workerUpdateFile, .{ astgen_wait_group.async(io, workerUpdateFile, .{
comp, file, file_index, zir_prog_node, &astgen_wait_group, comp, file, file_index, zir_prog_node, &astgen_wait_group,
}); });
} }
@ -4886,22 +4882,20 @@ fn performAllTheWork(
// `@embedFile` can't trigger analysis of a new `@embedFile`! // `@embedFile` can't trigger analysis of a new `@embedFile`!
for (0.., zcu.embed_table.keys()) |ef_index_usize, ef| { for (0.., zcu.embed_table.keys()) |ef_index_usize, ef| {
const ef_index: Zcu.EmbedFile.Index = @enumFromInt(ef_index_usize); const ef_index: Zcu.EmbedFile.Index = @enumFromInt(ef_index_usize);
comp.thread_pool.spawnWgId(&astgen_wait_group, workerUpdateEmbedFile, .{ astgen_wait_group.async(io, workerUpdateEmbedFile, .{ comp, ef_index, ef });
comp, ef_index, ef,
});
} }
} }
while (comp.c_object_work_queue.popFront()) |c_object| { while (comp.c_object_work_queue.popFront()) |c_object| {
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.thread_pool.spawnWg(&comp.link_task_wait_group, workerUpdateCObject, .{ comp.link_task_wait_group.async(io, workerUpdateCObject, .{
comp, c_object, main_progress_node, comp, c_object, main_progress_node,
}); });
} }
while (comp.win32_resource_work_queue.popFront()) |win32_resource| { while (comp.win32_resource_work_queue.popFront()) |win32_resource| {
comp.link_task_queue.startPrelinkItem(); comp.link_task_queue.startPrelinkItem();
comp.thread_pool.spawnWg(&comp.link_task_wait_group, workerUpdateWin32Resource, .{ comp.link_task_wait_group.async(io, workerUpdateWin32Resource, .{
comp, win32_resource, main_progress_node, comp, win32_resource, main_progress_node,
}); });
} }
@ -4935,8 +4929,8 @@ fn performAllTheWork(
defer gpa.free(path); defer gpa.free(path);
const result = res: { const result = res: {
whole.cache_manifest_mutex.lock(); whole.cache_manifest_mutex.lockUncancelable(io);
defer whole.cache_manifest_mutex.unlock(); defer whole.cache_manifest_mutex.unlock(io);
if (file.source) |source| { if (file.source) |source| {
break :res man.addFilePostContents(path, source, file.stat); break :res man.addFilePostContents(path, source, file.stat);
} else { } else {
@ -5008,8 +5002,8 @@ fn performAllTheWork(
if (!comp.separateCodegenThreadOk()) { if (!comp.separateCodegenThreadOk()) {
// Waits until all input files have been parsed. // Waits until all input files have been parsed.
comp.link_task_wait_group.wait(); comp.link_task_wait_group.wait(io);
comp.link_task_wait_group.reset(); comp.link_task_wait_group = .init;
std.log.scoped(.link).debug("finished waiting for link_task_wait_group", .{}); std.log.scoped(.link).debug("finished waiting for link_task_wait_group", .{});
} }
@ -5056,6 +5050,8 @@ pub fn queueJobs(comp: *Compilation, jobs: []const Job) !void {
} }
fn processOneJob(tid: usize, comp: *Compilation, job: Job) JobError!void { fn processOneJob(tid: usize, comp: *Compilation, job: Job) JobError!void {
const io = comp.io;
switch (job) { switch (job) {
.codegen_func => |func| { .codegen_func => |func| {
const zcu = comp.zcu.?; const zcu = comp.zcu.?;
@ -5087,7 +5083,7 @@ fn processOneJob(tid: usize, comp: *Compilation, job: Job) JobError!void {
const air_bytes: u32 = @intCast(air.instructions.len * 5 + air.extra.items.len * 4); const air_bytes: u32 = @intCast(air.instructions.len * 5 + air.extra.items.len * 4);
if (comp.separateCodegenThreadOk()) { if (comp.separateCodegenThreadOk()) {
// `workerZcuCodegen` takes ownership of `air`. // `workerZcuCodegen` takes ownership of `air`.
comp.thread_pool.spawnWgId(&comp.link_task_wait_group, workerZcuCodegen, .{ comp, func.func, air, shared_mir }); comp.link_task_wait_group.async(io, workerZcuCodegen, .{ comp, func.func, air, shared_mir });
comp.dispatchZcuLinkTask(tid, .{ .link_func = .{ comp.dispatchZcuLinkTask(tid, .{ .link_func = .{
.func = func.func, .func = func.func,
.mir = shared_mir, .mir = shared_mir,
@ -5463,7 +5459,6 @@ fn workerDocsWasmFallible(comp: *Compilation, prog_node: std.Progress.Node) SubU
.entry = .disabled, .entry = .disabled,
.cache_mode = .whole, .cache_mode = .whole,
.root_name = root_name, .root_name = root_name,
.thread_pool = comp.thread_pool,
.libc_installation = comp.libc_installation, .libc_installation = comp.libc_installation,
.emit_bin = .yes_cache, .emit_bin = .yes_cache,
.verbose_cc = comp.verbose_cc, .verbose_cc = comp.verbose_cc,
@ -5517,13 +5512,15 @@ fn workerDocsWasmFallible(comp: *Compilation, prog_node: std.Progress.Node) SubU
} }
fn workerUpdateFile( fn workerUpdateFile(
tid: usize,
comp: *Compilation, comp: *Compilation,
file: *Zcu.File, file: *Zcu.File,
file_index: Zcu.File.Index, file_index: Zcu.File.Index,
prog_node: std.Progress.Node, prog_node: std.Progress.Node,
wg: *WaitGroup, wg: *Io.Group,
) void { ) void {
const io = comp.io;
const tid: usize = std.Io.Threaded.getCurrentThreadId();
const child_prog_node = prog_node.start(fs.path.basename(file.path.sub_path), 0); const child_prog_node = prog_node.start(fs.path.basename(file.path.sub_path), 0);
defer child_prog_node.end(); defer child_prog_node.end();
@ -5532,8 +5529,8 @@ fn workerUpdateFile(
pt.updateFile(file_index, file) catch |err| { pt.updateFile(file_index, file) catch |err| {
pt.reportRetryableFileError(file_index, "unable to load '{s}': {s}", .{ fs.path.basename(file.path.sub_path), @errorName(err) }) catch |oom| switch (oom) { pt.reportRetryableFileError(file_index, "unable to load '{s}': {s}", .{ fs.path.basename(file.path.sub_path), @errorName(err) }) catch |oom| switch (oom) {
error.OutOfMemory => { error.OutOfMemory => {
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
comp.setAllocFailure(); comp.setAllocFailure();
}, },
}; };
@ -5563,14 +5560,14 @@ fn workerUpdateFile(
if (pt.discoverImport(file.path, import_path)) |res| switch (res) { if (pt.discoverImport(file.path, import_path)) |res| switch (res) {
.module, .existing_file => {}, .module, .existing_file => {},
.new_file => |new| { .new_file => |new| {
comp.thread_pool.spawnWgId(wg, workerUpdateFile, .{ wg.async(io, workerUpdateFile, .{
comp, new.file, new.index, prog_node, wg, comp, new.file, new.index, prog_node, wg,
}); });
}, },
} else |err| switch (err) { } else |err| switch (err) {
error.OutOfMemory => { error.OutOfMemory => {
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
comp.setAllocFailure(); comp.setAllocFailure();
}, },
} }
@ -5586,17 +5583,20 @@ fn workerUpdateBuiltinFile(comp: *Compilation, file: *Zcu.File) void {
); );
} }
fn workerUpdateEmbedFile(tid: usize, comp: *Compilation, ef_index: Zcu.EmbedFile.Index, ef: *Zcu.EmbedFile) void { fn workerUpdateEmbedFile(comp: *Compilation, ef_index: Zcu.EmbedFile.Index, ef: *Zcu.EmbedFile) void {
const tid: usize = std.Io.Threaded.getCurrentThreadId();
const io = comp.io;
comp.detectEmbedFileUpdate(@enumFromInt(tid), ef_index, ef) catch |err| switch (err) { comp.detectEmbedFileUpdate(@enumFromInt(tid), ef_index, ef) catch |err| switch (err) {
error.OutOfMemory => { error.OutOfMemory => {
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
comp.setAllocFailure(); comp.setAllocFailure();
}, },
}; };
} }
fn detectEmbedFileUpdate(comp: *Compilation, tid: Zcu.PerThread.Id, ef_index: Zcu.EmbedFile.Index, ef: *Zcu.EmbedFile) !void { fn detectEmbedFileUpdate(comp: *Compilation, tid: Zcu.PerThread.Id, ef_index: Zcu.EmbedFile.Index, ef: *Zcu.EmbedFile) !void {
const io = comp.io;
const zcu = comp.zcu.?; const zcu = comp.zcu.?;
const pt: Zcu.PerThread = .activate(zcu, tid); const pt: Zcu.PerThread = .activate(zcu, tid);
defer pt.deactivate(); defer pt.deactivate();
@ -5609,8 +5609,8 @@ fn detectEmbedFileUpdate(comp: *Compilation, tid: Zcu.PerThread.Id, ef_index: Zc
if (ef.val != .none and ef.val == old_val) return; // success, value unchanged if (ef.val != .none and ef.val == old_val) return; // success, value unchanged
if (ef.val == .none and old_val == .none and ef.err == old_err) return; // failure, error unchanged if (ef.val == .none and old_val == .none and ef.err == old_err) return; // failure, error unchanged
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
try zcu.markDependeeOutdated(.not_marked_po, .{ .embed_file = ef_index }); try zcu.markDependeeOutdated(.not_marked_po, .{ .embed_file = ef_index });
} }
@ -5753,8 +5753,8 @@ pub fn translateC(
switch (comp.cache_use) { switch (comp.cache_use) {
.whole => |whole| if (whole.cache_manifest) |whole_cache_manifest| { .whole => |whole| if (whole.cache_manifest) |whole_cache_manifest| {
whole.cache_manifest_mutex.lock(); whole.cache_manifest_mutex.lockUncancelable(io);
defer whole.cache_manifest_mutex.unlock(); defer whole.cache_manifest_mutex.unlock(io);
try whole_cache_manifest.addDepFilePost(cache_tmp_dir, dep_basename); try whole_cache_manifest.addDepFilePost(cache_tmp_dir, dep_basename);
}, },
.incremental, .none => {}, .incremental, .none => {},
@ -5892,12 +5892,12 @@ pub const RtOptions = struct {
}; };
fn workerZcuCodegen( fn workerZcuCodegen(
tid: usize,
comp: *Compilation, comp: *Compilation,
func_index: InternPool.Index, func_index: InternPool.Index,
orig_air: Air, orig_air: Air,
out: *link.ZcuTask.LinkFunc.SharedMir, out: *link.ZcuTask.LinkFunc.SharedMir,
) void { ) void {
const tid: usize = std.Io.Threaded.getCurrentThreadId();
var air = orig_air; var air = orig_air;
// We own `air` now, so we are responsbile for freeing it. // We own `air` now, so we are responsbile for freeing it.
defer air.deinit(comp.gpa); defer air.deinit(comp.gpa);
@ -6119,6 +6119,7 @@ fn reportRetryableWin32ResourceError(
win32_resource: *Win32Resource, win32_resource: *Win32Resource,
err: anyerror, err: anyerror,
) error{OutOfMemory}!void { ) error{OutOfMemory}!void {
const io = comp.io;
win32_resource.status = .failure_retryable; win32_resource.status = .failure_retryable;
var bundle: ErrorBundle.Wip = undefined; var bundle: ErrorBundle.Wip = undefined;
@ -6140,8 +6141,8 @@ fn reportRetryableWin32ResourceError(
}); });
const finished_bundle = try bundle.toOwnedBundle(""); const finished_bundle = try bundle.toOwnedBundle("");
{ {
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
try comp.failed_win32_resources.putNoClobber(comp.gpa, win32_resource, finished_bundle); try comp.failed_win32_resources.putNoClobber(comp.gpa, win32_resource, finished_bundle);
} }
} }
@ -6166,8 +6167,8 @@ fn updateCObject(comp: *Compilation, c_object: *CObject, c_obj_prog_node: std.Pr
if (c_object.clearStatus(gpa)) { if (c_object.clearStatus(gpa)) {
// There was previous failure. // There was previous failure.
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
// If the failure was OOM, there will not be an entry here, so we do // If the failure was OOM, there will not be an entry here, so we do
// not assert discard. // not assert discard.
_ = comp.failed_c_objects.swapRemove(c_object); _ = comp.failed_c_objects.swapRemove(c_object);
@ -6459,6 +6460,7 @@ fn updateCObject(comp: *Compilation, c_object: *CObject, c_obj_prog_node: std.Pr
} }
fn updateWin32Resource(comp: *Compilation, win32_resource: *Win32Resource, win32_resource_prog_node: std.Progress.Node) !void { fn updateWin32Resource(comp: *Compilation, win32_resource: *Win32Resource, win32_resource_prog_node: std.Progress.Node) !void {
const io = comp.io;
if (!std.process.can_spawn) { if (!std.process.can_spawn) {
return comp.failWin32Resource(win32_resource, "{s} does not support spawning a child process", .{@tagName(builtin.os.tag)}); return comp.failWin32Resource(win32_resource, "{s} does not support spawning a child process", .{@tagName(builtin.os.tag)});
} }
@ -6483,8 +6485,8 @@ fn updateWin32Resource(comp: *Compilation, win32_resource: *Win32Resource, win32
if (win32_resource.clearStatus(comp.gpa)) { if (win32_resource.clearStatus(comp.gpa)) {
// There was previous failure. // There was previous failure.
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
// If the failure was OOM, there will not be an entry here, so we do // If the failure was OOM, there will not be an entry here, so we do
// not assert discard. // not assert discard.
_ = comp.failed_win32_resources.swapRemove(win32_resource); _ = comp.failed_win32_resources.swapRemove(win32_resource);
@ -6658,8 +6660,8 @@ fn updateWin32Resource(comp: *Compilation, win32_resource: *Win32Resource, win32
try man.addFilePost(dep_file_path); try man.addFilePost(dep_file_path);
switch (comp.cache_use) { switch (comp.cache_use) {
.whole => |whole| if (whole.cache_manifest) |whole_cache_manifest| { .whole => |whole| if (whole.cache_manifest) |whole_cache_manifest| {
whole.cache_manifest_mutex.lock(); whole.cache_manifest_mutex.lockUncancelable(io);
defer whole.cache_manifest_mutex.unlock(); defer whole.cache_manifest_mutex.unlock(io);
try whole_cache_manifest.addFilePost(dep_file_path); try whole_cache_manifest.addFilePost(dep_file_path);
}, },
.incremental, .none => {}, .incremental, .none => {},
@ -7375,10 +7377,11 @@ fn failCObjWithOwnedDiagBundle(
diag_bundle: *CObject.Diag.Bundle, diag_bundle: *CObject.Diag.Bundle,
) SemaError { ) SemaError {
@branchHint(.cold); @branchHint(.cold);
const io = comp.io;
assert(diag_bundle.diags.len > 0); assert(diag_bundle.diags.len > 0);
{ {
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
{ {
errdefer diag_bundle.destroy(comp.gpa); errdefer diag_bundle.destroy(comp.gpa);
try comp.failed_c_objects.ensureUnusedCapacity(comp.gpa, 1); try comp.failed_c_objects.ensureUnusedCapacity(comp.gpa, 1);
@ -7418,9 +7421,10 @@ fn failWin32ResourceWithOwnedBundle(
err_bundle: ErrorBundle, err_bundle: ErrorBundle,
) SemaError { ) SemaError {
@branchHint(.cold); @branchHint(.cold);
const io = comp.io;
{ {
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
try comp.failed_win32_resources.putNoClobber(comp.gpa, win32_resource, err_bundle); try comp.failed_win32_resources.putNoClobber(comp.gpa, win32_resource, err_bundle);
} }
win32_resource.status = .failure; win32_resource.status = .failure;
@ -7744,8 +7748,9 @@ pub fn lockAndSetMiscFailure(
comptime format: []const u8, comptime format: []const u8,
args: anytype, args: anytype,
) void { ) void {
comp.mutex.lock(); const io = comp.io;
defer comp.mutex.unlock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(io);
return setMiscFailure(comp, tag, format, args); return setMiscFailure(comp, tag, format, args);
} }
@ -7775,6 +7780,7 @@ pub fn updateSubCompilation(
misc_task: MiscTask, misc_task: MiscTask,
prog_node: std.Progress.Node, prog_node: std.Progress.Node,
) SubUpdateError!void { ) SubUpdateError!void {
const io = parent_comp.io;
{ {
const sub_node = prog_node.start(@tagName(misc_task), 0); const sub_node = prog_node.start(@tagName(misc_task), 0);
defer sub_node.end(); defer sub_node.end();
@ -7789,8 +7795,8 @@ pub fn updateSubCompilation(
defer errors.deinit(gpa); defer errors.deinit(gpa);
if (errors.errorMessageCount() > 0) { if (errors.errorMessageCount() > 0) {
parent_comp.mutex.lock(); parent_comp.mutex.lockUncancelable(io);
defer parent_comp.mutex.unlock(); defer parent_comp.mutex.unlock(io);
try parent_comp.misc_failures.ensureUnusedCapacity(gpa, 1); try parent_comp.misc_failures.ensureUnusedCapacity(gpa, 1);
parent_comp.misc_failures.putAssumeCapacityNoClobber(misc_task, .{ parent_comp.misc_failures.putAssumeCapacityNoClobber(misc_task, .{
.msg = try std.fmt.allocPrint(gpa, "sub-compilation of {t} failed", .{misc_task}), .msg = try std.fmt.allocPrint(gpa, "sub-compilation of {t} failed", .{misc_task}),
@ -7898,7 +7904,6 @@ fn buildOutputFromZig(
.config = config, .config = config,
.root_mod = root_mod, .root_mod = root_mod,
.root_name = root_name, .root_name = root_name,
.thread_pool = comp.thread_pool,
.libc_installation = comp.libc_installation, .libc_installation = comp.libc_installation,
.emit_bin = .yes_cache, .emit_bin = .yes_cache,
.function_sections = true, .function_sections = true,
@ -8034,7 +8039,6 @@ pub fn build_crt_file(
.config = config, .config = config,
.root_mod = root_mod, .root_mod = root_mod,
.root_name = root_name, .root_name = root_name,
.thread_pool = comp.thread_pool,
.libc_installation = comp.libc_installation, .libc_installation = comp.libc_installation,
.emit_bin = .yes_cache, .emit_bin = .yes_cache,
.function_sections = options.function_sections orelse false, .function_sections = options.function_sections orelse false,
@ -8066,8 +8070,8 @@ pub fn build_crt_file(
comp.queuePrelinkTaskMode(crt_file.full_object_path, &config); comp.queuePrelinkTaskMode(crt_file.full_object_path, &config);
{ {
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
try comp.crt_files.ensureUnusedCapacity(gpa, 1); try comp.crt_files.ensureUnusedCapacity(gpa, 1);
comp.crt_files.putAssumeCapacityNoClobber(basename, crt_file); comp.crt_files.putAssumeCapacityNoClobber(basename, crt_file);
} }

View file

@ -38,15 +38,12 @@ const assert = std.debug.assert;
const ascii = std.ascii; const ascii = std.ascii;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const Cache = std.Build.Cache; const Cache = std.Build.Cache;
const ThreadPool = std.Thread.Pool;
const WaitGroup = std.Thread.WaitGroup;
const git = @import("Fetch/git.zig"); const git = @import("Fetch/git.zig");
const Package = @import("../Package.zig"); const Package = @import("../Package.zig");
const Manifest = Package.Manifest; const Manifest = Package.Manifest;
const ErrorBundle = std.zig.ErrorBundle; const ErrorBundle = std.zig.ErrorBundle;
arena: std.heap.ArenaAllocator, arena: std.heap.ArenaAllocator,
io: Io,
location: Location, location: Location,
location_tok: std.zig.Ast.TokenIndex, location_tok: std.zig.Ast.TokenIndex,
hash_tok: std.zig.Ast.OptionalTokenIndex, hash_tok: std.zig.Ast.OptionalTokenIndex,
@ -104,6 +101,7 @@ pub const LazyStatus = enum {
/// Contains shared state among all `Fetch` tasks. /// Contains shared state among all `Fetch` tasks.
pub const JobQueue = struct { pub const JobQueue = struct {
io: Io,
mutex: std.Thread.Mutex = .{}, mutex: std.Thread.Mutex = .{},
/// It's an array hash map so that it can be sorted before rendering the /// It's an array hash map so that it can be sorted before rendering the
/// dependencies.zig source file. /// dependencies.zig source file.
@ -115,8 +113,7 @@ pub const JobQueue = struct {
all_fetches: std.ArrayListUnmanaged(*Fetch) = .empty, all_fetches: std.ArrayListUnmanaged(*Fetch) = .empty,
http_client: *std.http.Client, http_client: *std.http.Client,
thread_pool: *ThreadPool, wait_group: Io.Group = .init,
wait_group: WaitGroup = .{},
global_cache: Cache.Directory, global_cache: Cache.Directory,
/// If true then, no fetching occurs, and: /// If true then, no fetching occurs, and:
/// * The `global_cache` directory is assumed to be the direct parent /// * The `global_cache` directory is assumed to be the direct parent
@ -326,7 +323,7 @@ pub const RunError = error{
}; };
pub fn run(f: *Fetch) RunError!void { pub fn run(f: *Fetch) RunError!void {
const io = f.io; const io = f.job_queue.io;
const eb = &f.error_bundle; const eb = &f.error_bundle;
const arena = f.arena.allocator(); const arena = f.arena.allocator();
const gpa = f.arena.child_allocator; const gpa = f.arena.child_allocator;
@ -488,7 +485,7 @@ fn runResource(
resource: *Resource, resource: *Resource,
remote_hash: ?Package.Hash, remote_hash: ?Package.Hash,
) RunError!void { ) RunError!void {
const io = f.io; const io = f.job_queue.io;
defer resource.deinit(io); defer resource.deinit(io);
const arena = f.arena.allocator(); const arena = f.arena.allocator();
const eb = &f.error_bundle; const eb = &f.error_bundle;
@ -702,7 +699,7 @@ fn loadManifest(f: *Fetch, pkg_root: Cache.Path) RunError!void {
} }
fn queueJobsForDeps(f: *Fetch) RunError!void { fn queueJobsForDeps(f: *Fetch) RunError!void {
const io = f.io; const io = f.job_queue.io;
assert(f.job_queue.recursive); assert(f.job_queue.recursive);
// If the package does not have a build.zig.zon file then there are no dependencies. // If the package does not have a build.zig.zon file then there are no dependencies.
@ -792,7 +789,6 @@ fn queueJobsForDeps(f: *Fetch) RunError!void {
f.job_queue.all_fetches.appendAssumeCapacity(new_fetch); f.job_queue.all_fetches.appendAssumeCapacity(new_fetch);
} }
new_fetch.* = .{ new_fetch.* = .{
.io = io,
.arena = std.heap.ArenaAllocator.init(gpa), .arena = std.heap.ArenaAllocator.init(gpa),
.location = location, .location = location,
.location_tok = dep.location_tok, .location_tok = dep.location_tok,
@ -831,10 +827,8 @@ fn queueJobsForDeps(f: *Fetch) RunError!void {
}; };
// Now it's time to give tasks to the thread pool. // Now it's time to give tasks to the thread pool.
const thread_pool = f.job_queue.thread_pool;
for (new_fetches, prog_names) |*new_fetch, prog_name| { for (new_fetches, prog_names) |*new_fetch, prog_name| {
thread_pool.spawnWg(&f.job_queue.wait_group, workerRun, .{ new_fetch, prog_name }); f.job_queue.wait_group.async(io, workerRun, .{ new_fetch, prog_name });
} }
} }
@ -992,7 +986,7 @@ const FileType = enum {
const init_resource_buffer_size = git.Packet.max_data_length; const init_resource_buffer_size = git.Packet.max_data_length;
fn initResource(f: *Fetch, uri: std.Uri, resource: *Resource, reader_buffer: []u8) RunError!void { fn initResource(f: *Fetch, uri: std.Uri, resource: *Resource, reader_buffer: []u8) RunError!void {
const io = f.io; const io = f.job_queue.io;
const arena = f.arena.allocator(); const arena = f.arena.allocator();
const eb = &f.error_bundle; const eb = &f.error_bundle;
@ -1286,7 +1280,7 @@ fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) error{ ReadFailed, OutO
// must be processed back to front and they could be too large to // must be processed back to front and they could be too large to
// load into memory. // load into memory.
const io = f.io; const io = f.job_queue.io;
const cache_root = f.job_queue.global_cache; const cache_root = f.job_queue.global_cache;
const prefix = "tmp/"; const prefix = "tmp/";
const suffix = ".zip"; const suffix = ".zip";
@ -1348,7 +1342,7 @@ fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) error{ ReadFailed, OutO
} }
fn unpackGitPack(f: *Fetch, out_dir: fs.Dir, resource: *Resource.Git) anyerror!UnpackResult { fn unpackGitPack(f: *Fetch, out_dir: fs.Dir, resource: *Resource.Git) anyerror!UnpackResult {
const io = f.io; const io = f.job_queue.io;
const arena = f.arena.allocator(); const arena = f.arena.allocator();
// TODO don't try to get a gpa from an arena. expose this dependency higher up // TODO don't try to get a gpa from an arena. expose this dependency higher up
// because the backing of arena could be page allocator // because the backing of arena could be page allocator
@ -1486,11 +1480,11 @@ const ComputedHash = struct {
/// hashed* and must not be present on the file system when calling this /// hashed* and must not be present on the file system when calling this
/// function. /// function.
fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!ComputedHash { fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!ComputedHash {
const io = f.job_queue.io;
// All the path name strings need to be in memory for sorting. // All the path name strings need to be in memory for sorting.
const arena = f.arena.allocator(); const arena = f.arena.allocator();
const gpa = f.arena.child_allocator; const gpa = f.arena.child_allocator;
const eb = &f.error_bundle; const eb = &f.error_bundle;
const thread_pool = f.job_queue.thread_pool;
const root_dir = pkg_path.root_dir.handle; const root_dir = pkg_path.root_dir.handle;
// Collect all files, recursively, then sort. // Collect all files, recursively, then sort.
@ -1514,15 +1508,15 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute
{ {
// The final hash will be a hash of each file hashed independently. This // The final hash will be a hash of each file hashed independently. This
// allows hashing in parallel. // allows hashing in parallel.
var wait_group: WaitGroup = .{}; var wait_group: Io.Group = .init;
// `computeHash` is called from a worker thread so there must not be // TODO `computeHash` is called from a worker thread so there must not be
// any waiting without working or a deadlock could occur. // any waiting without working or a deadlock could occur.
defer thread_pool.waitAndWork(&wait_group); defer wait_group.wait(io);
while (walker.next() catch |err| { while (walker.next() catch |err| {
try eb.addRootErrorMessage(.{ .msg = try eb.printString( try eb.addRootErrorMessage(.{ .msg = try eb.printString(
"unable to walk temporary directory '{f}': {s}", "unable to walk temporary directory '{f}': {t}",
.{ pkg_path, @errorName(err) }, .{ pkg_path, err },
) }); ) });
return error.FetchFailed; return error.FetchFailed;
}) |entry| { }) |entry| {
@ -1542,7 +1536,7 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute
.fs_path = fs_path, .fs_path = fs_path,
.failure = undefined, // to be populated by the worker .failure = undefined, // to be populated by the worker
}; };
thread_pool.spawnWg(&wait_group, workerDeleteFile, .{ root_dir, deleted_file }); wait_group.async(io, workerDeleteFile, .{ root_dir, deleted_file });
try deleted_files.append(deleted_file); try deleted_files.append(deleted_file);
continue; continue;
} }
@ -1570,7 +1564,7 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute
.failure = undefined, // to be populated by the worker .failure = undefined, // to be populated by the worker
.size = undefined, // to be populated by the worker .size = undefined, // to be populated by the worker
}; };
thread_pool.spawnWg(&wait_group, workerHashFile, .{ root_dir, hashed_file }); wait_group.async(io, workerHashFile, .{ root_dir, hashed_file });
try all_files.append(hashed_file); try all_files.append(hashed_file);
} }
} }
@ -2241,7 +2235,6 @@ fn saveEmbedFile(comptime tarball_name: []const u8, dir: fs.Dir) !void {
// Builds Fetch with required dependencies, clears dependencies on deinit(). // Builds Fetch with required dependencies, clears dependencies on deinit().
const TestFetchBuilder = struct { const TestFetchBuilder = struct {
thread_pool: ThreadPool,
http_client: std.http.Client, http_client: std.http.Client,
global_cache_directory: Cache.Directory, global_cache_directory: Cache.Directory,
job_queue: Fetch.JobQueue, job_queue: Fetch.JobQueue,
@ -2256,13 +2249,11 @@ const TestFetchBuilder = struct {
) !*Fetch { ) !*Fetch {
const cache_dir = try cache_parent_dir.makeOpenPath("zig-global-cache", .{}); const cache_dir = try cache_parent_dir.makeOpenPath("zig-global-cache", .{});
try self.thread_pool.init(.{ .allocator = allocator });
self.http_client = .{ .allocator = allocator, .io = io }; self.http_client = .{ .allocator = allocator, .io = io };
self.global_cache_directory = .{ .handle = cache_dir, .path = null }; self.global_cache_directory = .{ .handle = cache_dir, .path = null };
self.job_queue = .{ self.job_queue = .{
.http_client = &self.http_client, .http_client = &self.http_client,
.thread_pool = &self.thread_pool,
.global_cache = self.global_cache_directory, .global_cache = self.global_cache_directory,
.recursive = false, .recursive = false,
.read_only = false, .read_only = false,
@ -2309,7 +2300,6 @@ const TestFetchBuilder = struct {
self.fetch.prog_node.end(); self.fetch.prog_node.end();
self.global_cache_directory.handle.close(); self.global_cache_directory.handle.close();
self.http_client.deinit(); self.http_client.deinit();
self.thread_pool.deinit();
} }
fn packageDir(self: *TestFetchBuilder) !fs.Dir { fn packageDir(self: *TestFetchBuilder) !fs.Dir {

View file

@ -4566,9 +4566,10 @@ pub fn codegenFail(
/// Takes ownership of `msg`, even on OOM. /// Takes ownership of `msg`, even on OOM.
pub fn codegenFailMsg(zcu: *Zcu, nav_index: InternPool.Nav.Index, msg: *ErrorMsg) CodegenFailError { pub fn codegenFailMsg(zcu: *Zcu, nav_index: InternPool.Nav.Index, msg: *ErrorMsg) CodegenFailError {
const gpa = zcu.gpa; const gpa = zcu.gpa;
const io = zcu.comp.io;
{ {
zcu.comp.mutex.lock(); zcu.comp.mutex.lockUncancelable(io);
defer zcu.comp.mutex.unlock(); defer zcu.comp.mutex.unlock(io);
errdefer msg.deinit(gpa); errdefer msg.deinit(gpa);
try zcu.failed_codegen.putNoClobber(gpa, nav_index, msg); try zcu.failed_codegen.putNoClobber(gpa, nav_index, msg);
} }
@ -4577,8 +4578,9 @@ pub fn codegenFailMsg(zcu: *Zcu, nav_index: InternPool.Nav.Index, msg: *ErrorMsg
/// Asserts that `zcu.failed_codegen` contains the key `nav`, with the necessary lock held. /// Asserts that `zcu.failed_codegen` contains the key `nav`, with the necessary lock held.
pub fn assertCodegenFailed(zcu: *Zcu, nav: InternPool.Nav.Index) void { pub fn assertCodegenFailed(zcu: *Zcu, nav: InternPool.Nav.Index) void {
zcu.comp.mutex.lock(); const io = zcu.comp.io;
defer zcu.comp.mutex.unlock(); zcu.comp.mutex.lockUncancelable(io);
defer zcu.comp.mutex.unlock(io);
assert(zcu.failed_codegen.contains(nav)); assert(zcu.failed_codegen.contains(nav));
} }
@ -4729,6 +4731,7 @@ const TrackedUnitSema = struct {
analysis_timer_decl: ?InternPool.TrackedInst.Index, analysis_timer_decl: ?InternPool.TrackedInst.Index,
pub fn end(tus: TrackedUnitSema, zcu: *Zcu) void { pub fn end(tus: TrackedUnitSema, zcu: *Zcu) void {
const comp = zcu.comp; const comp = zcu.comp;
const io = comp.io;
if (tus.old_name) |old_name| { if (tus.old_name) |old_name| {
zcu.sema_prog_node.completeOne(); // we're just renaming, but it's effectively completion zcu.sema_prog_node.completeOne(); // we're just renaming, but it's effectively completion
zcu.cur_sema_prog_node.setName(&old_name); zcu.cur_sema_prog_node.setName(&old_name);
@ -4739,8 +4742,8 @@ const TrackedUnitSema = struct {
report_time: { report_time: {
const sema_ns = zcu.cur_analysis_timer.?.finish() orelse break :report_time; const sema_ns = zcu.cur_analysis_timer.?.finish() orelse break :report_time;
const zir_decl = tus.analysis_timer_decl orelse break :report_time; const zir_decl = tus.analysis_timer_decl orelse break :report_time;
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
comp.time_report.?.stats.cpu_ns_sema += sema_ns; comp.time_report.?.stats.cpu_ns_sema += sema_ns;
const gop = comp.time_report.?.decl_sema_info.getOrPut(comp.gpa, zir_decl) catch |err| switch (err) { const gop = comp.time_report.?.decl_sema_info.getOrPut(comp.gpa, zir_decl) catch |err| switch (err) {
error.OutOfMemory => { error.OutOfMemory => {

View file

@ -267,8 +267,8 @@ pub fn updateFile(
// Any potential AST errors are converted to ZIR errors when we run AstGen/ZonGen. // Any potential AST errors are converted to ZIR errors when we run AstGen/ZonGen.
file.tree = try Ast.parse(gpa, source, file.getMode()); file.tree = try Ast.parse(gpa, source, file.getMode());
if (timer.finish()) |ns_parse| { if (timer.finish()) |ns_parse| {
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
comp.time_report.?.stats.cpu_ns_parse += ns_parse; comp.time_report.?.stats.cpu_ns_parse += ns_parse;
} }
@ -293,8 +293,8 @@ pub fn updateFile(
}, },
} }
if (timer.finish()) |ns_astgen| { if (timer.finish()) |ns_astgen| {
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
comp.time_report.?.stats.cpu_ns_astgen += ns_astgen; comp.time_report.?.stats.cpu_ns_astgen += ns_astgen;
} }
@ -313,8 +313,8 @@ pub fn updateFile(
switch (file.getMode()) { switch (file.getMode()) {
.zig => { .zig => {
if (file.zir.?.hasCompileErrors()) { if (file.zir.?.hasCompileErrors()) {
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
try zcu.failed_files.putNoClobber(gpa, file_index, null); try zcu.failed_files.putNoClobber(gpa, file_index, null);
} }
if (file.zir.?.loweringFailed()) { if (file.zir.?.loweringFailed()) {
@ -326,8 +326,8 @@ pub fn updateFile(
.zon => { .zon => {
if (file.zoir.?.hasCompileErrors()) { if (file.zoir.?.hasCompileErrors()) {
file.status = .astgen_failure; file.status = .astgen_failure;
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
try zcu.failed_files.putNoClobber(gpa, file_index, null); try zcu.failed_files.putNoClobber(gpa, file_index, null);
} else { } else {
file.status = .success; file.status = .success;
@ -1910,6 +1910,7 @@ pub fn discoverImport(
} { } {
const zcu = pt.zcu; const zcu = pt.zcu;
const gpa = zcu.gpa; const gpa = zcu.gpa;
const io = zcu.comp.io;
if (!mem.endsWith(u8, import_string, ".zig") and !mem.endsWith(u8, import_string, ".zon")) { if (!mem.endsWith(u8, import_string, ".zig") and !mem.endsWith(u8, import_string, ".zon")) {
return .module; return .module;
@ -1919,8 +1920,8 @@ pub fn discoverImport(
errdefer new_path.deinit(gpa); errdefer new_path.deinit(gpa);
// We're about to do a GOP on `import_table`, so we need the mutex. // We're about to do a GOP on `import_table`, so we need the mutex.
zcu.comp.mutex.lock(); zcu.comp.mutex.lockUncancelable(io);
defer zcu.comp.mutex.unlock(); defer zcu.comp.mutex.unlock(io);
const gop = try zcu.import_table.getOrPutAdapted(gpa, new_path, Zcu.ImportTableAdapter{ .zcu = zcu }); const gop = try zcu.import_table.getOrPutAdapted(gpa, new_path, Zcu.ImportTableAdapter{ .zcu = zcu });
errdefer _ = zcu.import_table.pop(); errdefer _ = zcu.import_table.pop();
@ -2502,12 +2503,10 @@ fn updateEmbedFileInner(
} }
/// Assumes that `path` is allocated into `gpa`. Takes ownership of `path` on success. /// Assumes that `path` is allocated into `gpa`. Takes ownership of `path` on success.
fn newEmbedFile( fn newEmbedFile(pt: Zcu.PerThread, path: Compilation.Path) !*Zcu.EmbedFile {
pt: Zcu.PerThread,
path: Compilation.Path,
) !*Zcu.EmbedFile {
const zcu = pt.zcu; const zcu = pt.zcu;
const comp = zcu.comp; const comp = zcu.comp;
const io = comp.io;
const gpa = zcu.gpa; const gpa = zcu.gpa;
const ip = &zcu.intern_pool; const ip = &zcu.intern_pool;
@ -2541,8 +2540,8 @@ fn newEmbedFile(
const path_str = try path.toAbsolute(comp.dirs, gpa); const path_str = try path.toAbsolute(comp.dirs, gpa);
defer gpa.free(path_str); defer gpa.free(path_str);
whole.cache_manifest_mutex.lock(); whole.cache_manifest_mutex.lockUncancelable(io);
defer whole.cache_manifest_mutex.unlock(); defer whole.cache_manifest_mutex.unlock(io);
man.addFilePostContents(path_str, contents, new_file.stat) catch |err| switch (err) { man.addFilePostContents(path_str, contents, new_file.stat) catch |err| switch (err) {
error.Unexpected => unreachable, error.Unexpected => unreachable,
@ -3049,6 +3048,8 @@ pub fn getErrorValueFromSlice(pt: Zcu.PerThread, name: []const u8) Allocator.Err
/// Removes any entry from `Zcu.failed_files` associated with `file`. Acquires `Compilation.mutex` as needed. /// Removes any entry from `Zcu.failed_files` associated with `file`. Acquires `Compilation.mutex` as needed.
/// `file.zir` must be unchanged from the last update, as it is used to determine if there is such an entry. /// `file.zir` must be unchanged from the last update, as it is used to determine if there is such an entry.
fn lockAndClearFileCompileError(pt: Zcu.PerThread, file_index: Zcu.File.Index, file: *Zcu.File) void { fn lockAndClearFileCompileError(pt: Zcu.PerThread, file_index: Zcu.File.Index, file: *Zcu.File) void {
const io = pt.zcu.comp.io;
const maybe_has_error = switch (file.status) { const maybe_has_error = switch (file.status) {
.never_loaded => false, .never_loaded => false,
.retryable_failure => true, .retryable_failure => true,
@ -3070,8 +3071,8 @@ fn lockAndClearFileCompileError(pt: Zcu.PerThread, file_index: Zcu.File.Index, f
return; return;
} }
pt.zcu.comp.mutex.lock(); pt.zcu.comp.mutex.lockUncancelable(io);
defer pt.zcu.comp.mutex.unlock(); defer pt.zcu.comp.mutex.unlock(io);
if (pt.zcu.failed_files.fetchSwapRemove(file_index)) |kv| { if (pt.zcu.failed_files.fetchSwapRemove(file_index)) |kv| {
assert(maybe_has_error); // the runtime safety case above assert(maybe_has_error); // the runtime safety case above
if (kv.value) |msg| pt.zcu.gpa.free(msg); // delete previous error message if (kv.value) |msg| pt.zcu.gpa.free(msg); // delete previous error message
@ -3400,6 +3401,7 @@ pub fn reportRetryableFileError(
) error{OutOfMemory}!void { ) error{OutOfMemory}!void {
const zcu = pt.zcu; const zcu = pt.zcu;
const gpa = zcu.gpa; const gpa = zcu.gpa;
const io = zcu.comp.io;
const file = zcu.fileByIndex(file_index); const file = zcu.fileByIndex(file_index);
@ -3409,8 +3411,8 @@ pub fn reportRetryableFileError(
errdefer gpa.free(msg); errdefer gpa.free(msg);
const old_msg: ?[]u8 = old_msg: { const old_msg: ?[]u8 = old_msg: {
zcu.comp.mutex.lock(); zcu.comp.mutex.lockUncancelable(io);
defer zcu.comp.mutex.unlock(); defer zcu.comp.mutex.unlock(io);
const gop = try zcu.failed_files.getOrPut(gpa, file_index); const gop = try zcu.failed_files.getOrPut(gpa, file_index);
const old: ?[]u8 = if (gop.found_existing) old: { const old: ?[]u8 = if (gop.found_existing) old: {
@ -4391,6 +4393,7 @@ pub fn addDependency(pt: Zcu.PerThread, unit: AnalUnit, dependee: InternPool.Dep
/// codegen thread, depending on whether the backend supports `Zcu.Feature.separate_thread`. /// codegen thread, depending on whether the backend supports `Zcu.Feature.separate_thread`.
pub fn runCodegen(pt: Zcu.PerThread, func_index: InternPool.Index, air: *Air, out: *@import("../link.zig").ZcuTask.LinkFunc.SharedMir) void { pub fn runCodegen(pt: Zcu.PerThread, func_index: InternPool.Index, air: *Air, out: *@import("../link.zig").ZcuTask.LinkFunc.SharedMir) void {
const zcu = pt.zcu; const zcu = pt.zcu;
const io = zcu.comp.io;
crash_report.CodegenFunc.start(zcu, func_index); crash_report.CodegenFunc.start(zcu, func_index);
defer crash_report.CodegenFunc.stop(func_index); defer crash_report.CodegenFunc.stop(func_index);
@ -4422,8 +4425,8 @@ pub fn runCodegen(pt: Zcu.PerThread, func_index: InternPool.Index, air: *Air, ou
const ip = &zcu.intern_pool; const ip = &zcu.intern_pool;
const nav = ip.indexToKey(func_index).func.owner_nav; const nav = ip.indexToKey(func_index).func.owner_nav;
const zir_decl = ip.getNav(nav).srcInst(ip); const zir_decl = ip.getNav(nav).srcInst(ip);
zcu.comp.mutex.lock(); zcu.comp.mutex.lockUncancelable(io);
defer zcu.comp.mutex.unlock(); defer zcu.comp.mutex.unlock(io);
const tr = &zcu.comp.time_report.?; const tr = &zcu.comp.time_report.?;
tr.stats.cpu_ns_codegen += ns_codegen; tr.stats.cpu_ns_codegen += ns_codegen;
const gop = tr.decl_codegen_ns.getOrPut(zcu.gpa, zir_decl) catch |err| switch (err) { const gop = tr.decl_codegen_ns.getOrPut(zcu.gpa, zir_decl) catch |err| switch (err) {

View file

@ -281,8 +281,8 @@ pub fn buildImportLib(comp: *Compilation, lib_name: []const u8) !void {
const sub_path = try std.fs.path.join(gpa, &.{ "o", &digest, final_lib_basename }); const sub_path = try std.fs.path.join(gpa, &.{ "o", &digest, final_lib_basename });
errdefer gpa.free(sub_path); errdefer gpa.free(sub_path);
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
try comp.crt_files.ensureUnusedCapacity(gpa, 1); try comp.crt_files.ensureUnusedCapacity(gpa, 1);
comp.crt_files.putAssumeCapacityNoClobber(final_lib_basename, .{ comp.crt_files.putAssumeCapacityNoClobber(final_lib_basename, .{
.full_object_path = .{ .full_object_path = .{
@ -388,8 +388,8 @@ pub fn buildImportLib(comp: *Compilation, lib_name: []const u8) !void {
log.warn("failed to write cache manifest for DLL import {s}.lib: {s}", .{ lib_name, @errorName(err) }); log.warn("failed to write cache manifest for DLL import {s}.lib: {s}", .{ lib_name, @errorName(err) });
}; };
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
try comp.crt_files.putNoClobber(gpa, final_lib_basename, .{ try comp.crt_files.putNoClobber(gpa, final_lib_basename, .{
.full_object_path = .{ .full_object_path = .{
.root_dir = comp.dirs.global_cache, .root_dir = comp.dirs.global_cache,

View file

@ -1344,6 +1344,7 @@ pub const ZcuTask = union(enum) {
}; };
pub fn doPrelinkTask(comp: *Compilation, task: PrelinkTask) void { pub fn doPrelinkTask(comp: *Compilation, task: PrelinkTask) void {
const io = comp.io;
const diags = &comp.link_diags; const diags = &comp.link_diags;
const base = comp.bin_file orelse { const base = comp.bin_file orelse {
comp.link_prog_node.completeOne(); comp.link_prog_node.completeOne();
@ -1352,8 +1353,8 @@ pub fn doPrelinkTask(comp: *Compilation, task: PrelinkTask) void {
var timer = comp.startTimer(); var timer = comp.startTimer();
defer if (timer.finish()) |ns| { defer if (timer.finish()) |ns| {
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
comp.time_report.?.stats.cpu_ns_link += ns; comp.time_report.?.stats.cpu_ns_link += ns;
}; };
@ -1478,6 +1479,7 @@ pub fn doPrelinkTask(comp: *Compilation, task: PrelinkTask) void {
} }
} }
pub fn doZcuTask(comp: *Compilation, tid: usize, task: ZcuTask) void { pub fn doZcuTask(comp: *Compilation, tid: usize, task: ZcuTask) void {
const io = comp.io;
const diags = &comp.link_diags; const diags = &comp.link_diags;
const zcu = comp.zcu.?; const zcu = comp.zcu.?;
const ip = &zcu.intern_pool; const ip = &zcu.intern_pool;
@ -1567,8 +1569,8 @@ pub fn doZcuTask(comp: *Compilation, tid: usize, task: ZcuTask) void {
.link_nav => |nav| ip.getNav(nav).srcInst(ip), .link_nav => |nav| ip.getNav(nav).srcInst(ip),
.link_func => |f| ip.getNav(ip.indexToKey(f.func).func.owner_nav).srcInst(ip), .link_func => |f| ip.getNav(ip.indexToKey(f.func).func.owner_nav).srcInst(ip),
}; };
comp.mutex.lock(); comp.mutex.lockUncancelable(io);
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
const tr = &zcu.comp.time_report.?; const tr = &zcu.comp.time_report.?;
tr.stats.cpu_ns_link += ns_link; tr.stats.cpu_ns_link += ns_link;
if (zir_decl) |inst| { if (zir_decl) |inst| {

View file

@ -713,6 +713,7 @@ pub fn allocateChunk(self: *Elf, args: struct {
pub fn loadInput(self: *Elf, input: link.Input) !void { pub fn loadInput(self: *Elf, input: link.Input) !void {
const comp = self.base.comp; const comp = self.base.comp;
const gpa = comp.gpa; const gpa = comp.gpa;
const io = comp.io;
const diags = &comp.link_diags; const diags = &comp.link_diags;
const target = self.getTarget(); const target = self.getTarget();
const debug_fmt_strip = comp.config.debug_format == .strip; const debug_fmt_strip = comp.config.debug_format == .strip;
@ -720,8 +721,8 @@ pub fn loadInput(self: *Elf, input: link.Input) !void {
const is_static_lib = self.base.isStaticLib(); const is_static_lib = self.base.isStaticLib();
if (comp.verbose_link) { if (comp.verbose_link) {
comp.mutex.lock(); // protect comp.arena comp.mutex.lockUncancelable(io); // protect comp.arena
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
const argv = &self.dump_argv_list; const argv = &self.dump_argv_list;
switch (input) { switch (input) {

View file

@ -267,6 +267,7 @@ pub fn writeAdhocSignature(
defer tracy.end(); defer tracy.end();
const allocator = macho_file.base.comp.gpa; const allocator = macho_file.base.comp.gpa;
const io = macho_file.base.comp.io;
var header: macho.SuperBlob = .{ var header: macho.SuperBlob = .{
.magic = macho.CSMAGIC_EMBEDDED_SIGNATURE, .magic = macho.CSMAGIC_EMBEDDED_SIGNATURE,
@ -289,7 +290,7 @@ pub fn writeAdhocSignature(
self.code_directory.inner.nCodeSlots = total_pages; self.code_directory.inner.nCodeSlots = total_pages;
// Calculate hash for each page (in file) and write it to the buffer // Calculate hash for each page (in file) and write it to the buffer
var hasher = Hasher(Sha256){ .allocator = allocator, .thread_pool = macho_file.base.comp.thread_pool }; var hasher: Hasher(Sha256) = .{ .allocator = allocator, .io = io };
try hasher.hash(opts.file, self.code_directory.code_slots.items, .{ try hasher.hash(opts.file, self.code_directory.code_slots.items, .{
.chunk_size = self.page_size, .chunk_size = self.page_size,
.max_file_size = opts.file_size, .max_file_size = opts.file_size,

View file

@ -3,7 +3,7 @@ pub fn ParallelHasher(comptime Hasher: type) type {
return struct { return struct {
allocator: Allocator, allocator: Allocator,
thread_pool: *ThreadPool, io: Io,
pub fn hash(self: Self, file: fs.File, out: [][hash_size]u8, opts: struct { pub fn hash(self: Self, file: fs.File, out: [][hash_size]u8, opts: struct {
chunk_size: u64 = 0x4000, chunk_size: u64 = 0x4000,
@ -12,7 +12,8 @@ pub fn ParallelHasher(comptime Hasher: type) type {
const tracy = trace(@src()); const tracy = trace(@src());
defer tracy.end(); defer tracy.end();
var wg: WaitGroup = .{}; const io = self.io;
const gpa = self.allocator;
const file_size = blk: { const file_size = blk: {
const file_size = opts.max_file_size orelse try file.getEndPos(); const file_size = opts.max_file_size orelse try file.getEndPos();
@ -20,15 +21,15 @@ pub fn ParallelHasher(comptime Hasher: type) type {
}; };
const chunk_size = std.math.cast(usize, opts.chunk_size) orelse return error.Overflow; const chunk_size = std.math.cast(usize, opts.chunk_size) orelse return error.Overflow;
const buffer = try self.allocator.alloc(u8, chunk_size * out.len); const buffer = try gpa.alloc(u8, chunk_size * out.len);
defer self.allocator.free(buffer); defer gpa.free(buffer);
const results = try self.allocator.alloc(fs.File.PReadError!usize, out.len); const results = try gpa.alloc(fs.File.PReadError!usize, out.len);
defer self.allocator.free(results); defer gpa.free(results);
{ {
wg.reset(); var wg: Io.Group = .init;
defer wg.wait(); defer wg.wait(io);
for (out, results, 0..) |*out_buf, *result, i| { for (out, results, 0..) |*out_buf, *result, i| {
const fstart = i * chunk_size; const fstart = i * chunk_size;
@ -36,7 +37,8 @@ pub fn ParallelHasher(comptime Hasher: type) type {
file_size - fstart file_size - fstart
else else
chunk_size; chunk_size;
self.thread_pool.spawnWg(&wg, worker, .{
wg.async(io, worker, .{
file, file,
fstart, fstart,
buffer[fstart..][0..fsize], buffer[fstart..][0..fsize],
@ -65,12 +67,11 @@ pub fn ParallelHasher(comptime Hasher: type) type {
}; };
} }
const std = @import("std");
const Io = std.Io;
const assert = std.debug.assert; const assert = std.debug.assert;
const fs = std.fs; const fs = std.fs;
const mem = std.mem; const mem = std.mem;
const std = @import("std");
const trace = @import("../../tracy.zig").trace;
const Allocator = mem.Allocator; const Allocator = mem.Allocator;
const ThreadPool = std.Thread.Pool;
const WaitGroup = std.Thread.WaitGroup; const trace = @import("../../tracy.zig").trace;

View file

@ -773,7 +773,6 @@ fn writeHeader(macho_file: *MachO, ncmds: usize, sizeofcmds: usize) !void {
const std = @import("std"); const std = @import("std");
const Path = std.Build.Cache.Path; const Path = std.Build.Cache.Path;
const WaitGroup = std.Thread.WaitGroup;
const assert = std.debug.assert; const assert = std.debug.assert;
const log = std.log.scoped(.link); const log = std.log.scoped(.link);
const macho = std.macho; const macho = std.macho;

View file

@ -15,7 +15,7 @@ pub fn calcUuid(comp: *const Compilation, file: fs.File, file_size: u64, out: *[
const hashes = try comp.gpa.alloc([Md5.digest_length]u8, actual_num_chunks); const hashes = try comp.gpa.alloc([Md5.digest_length]u8, actual_num_chunks);
defer comp.gpa.free(hashes); defer comp.gpa.free(hashes);
var hasher = Hasher(Md5){ .allocator = comp.gpa, .thread_pool = comp.thread_pool }; var hasher: Hasher(Md5) = .{ .allocator = comp.gpa, .io = comp.io };
try hasher.hash(file, hashes, .{ try hasher.hash(file, hashes, .{
.chunk_size = chunk_size, .chunk_size = chunk_size,
.max_file_size = file_size, .max_file_size = file_size,
@ -46,4 +46,3 @@ const trace = @import("../../tracy.zig").trace;
const Compilation = @import("../../Compilation.zig"); const Compilation = @import("../../Compilation.zig");
const Md5 = std.crypto.hash.Md5; const Md5 = std.crypto.hash.Md5;
const Hasher = @import("hasher.zig").ParallelHasher; const Hasher = @import("hasher.zig").ParallelHasher;
const ThreadPool = std.Thread.Pool;

View file

@ -102,6 +102,7 @@ pub fn deinit(q: *Queue, comp: *Compilation) void {
/// This is expected to be called exactly once, after which the caller must not directly access /// This is expected to be called exactly once, after which the caller must not directly access
/// `queued_prelink` any longer. This will spawn the link thread if necessary. /// `queued_prelink` any longer. This will spawn the link thread if necessary.
pub fn start(q: *Queue, comp: *Compilation) void { pub fn start(q: *Queue, comp: *Compilation) void {
const io = comp.io;
assert(q.state == .finished); assert(q.state == .finished);
assert(q.queued_zcu.items.len == 0); assert(q.queued_zcu.items.len == 0);
// Reset this to 1. We can't init it to 1 in `empty`, because it would fall to 0 on successive // Reset this to 1. We can't init it to 1 in `empty`, because it would fall to 0 on successive
@ -109,7 +110,7 @@ pub fn start(q: *Queue, comp: *Compilation) void {
q.prelink_wait_count = 1; q.prelink_wait_count = 1;
if (q.queued_prelink.items.len != 0) { if (q.queued_prelink.items.len != 0) {
q.state = .running; q.state = .running;
comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp }); comp.link_task_wait_group.async(io, flushTaskQueue, .{ q, comp });
} }
} }
@ -124,6 +125,7 @@ pub fn startPrelinkItem(q: *Queue) void {
/// indicates that we have finished calling `startPrelinkItem`, so once all pending items finish, /// indicates that we have finished calling `startPrelinkItem`, so once all pending items finish,
/// we are ready to move on to ZCU tasks. /// we are ready to move on to ZCU tasks.
pub fn finishPrelinkItem(q: *Queue, comp: *Compilation) void { pub fn finishPrelinkItem(q: *Queue, comp: *Compilation) void {
const io = comp.io;
{ {
q.mutex.lock(); q.mutex.lock();
defer q.mutex.unlock(); defer q.mutex.unlock();
@ -140,12 +142,13 @@ pub fn finishPrelinkItem(q: *Queue, comp: *Compilation) void {
// that `link.File.prelink()` is called. // that `link.File.prelink()` is called.
q.state = .running; q.state = .running;
} }
comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp }); comp.link_task_wait_group.async(io, flushTaskQueue, .{ q, comp });
} }
/// Called by codegen workers after they have populated a `ZcuTask.LinkFunc.SharedMir`. If the link /// Called by codegen workers after they have populated a `ZcuTask.LinkFunc.SharedMir`. If the link
/// thread was waiting for this MIR, it can resume. /// thread was waiting for this MIR, it can resume.
pub fn mirReady(q: *Queue, comp: *Compilation, func_index: InternPool.Index, mir: *ZcuTask.LinkFunc.SharedMir) void { pub fn mirReady(q: *Queue, comp: *Compilation, func_index: InternPool.Index, mir: *ZcuTask.LinkFunc.SharedMir) void {
const io = comp.io;
// We would like to assert that `mir` is not pending, but that would race with a worker thread // We would like to assert that `mir` is not pending, but that would race with a worker thread
// potentially freeing it. // potentially freeing it.
{ {
@ -159,12 +162,13 @@ pub fn mirReady(q: *Queue, comp: *Compilation, func_index: InternPool.Index, mir
q.state = .running; q.state = .running;
} }
assert(mir.status.load(.acquire) != .pending); assert(mir.status.load(.acquire) != .pending);
comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp }); comp.link_task_wait_group.async(io, flushTaskQueue, .{ q, comp });
} }
/// Enqueues all prelink tasks in `tasks`. Asserts that they were expected, i.e. that /// Enqueues all prelink tasks in `tasks`. Asserts that they were expected, i.e. that
/// `prelink_wait_count` is not yet 0. Also asserts that `tasks.len` is not 0. /// `prelink_wait_count` is not yet 0. Also asserts that `tasks.len` is not 0.
pub fn enqueuePrelink(q: *Queue, comp: *Compilation, tasks: []const PrelinkTask) Allocator.Error!void { pub fn enqueuePrelink(q: *Queue, comp: *Compilation, tasks: []const PrelinkTask) Allocator.Error!void {
const io = comp.io;
{ {
q.mutex.lock(); q.mutex.lock();
defer q.mutex.unlock(); defer q.mutex.unlock();
@ -178,10 +182,11 @@ pub fn enqueuePrelink(q: *Queue, comp: *Compilation, tasks: []const PrelinkTask)
// Restart the linker thread, because it was waiting for a task // Restart the linker thread, because it was waiting for a task
q.state = .running; q.state = .running;
} }
comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp }); comp.link_task_wait_group.async(io, flushTaskQueue, .{ q, comp });
} }
pub fn enqueueZcu(q: *Queue, comp: *Compilation, task: ZcuTask) Allocator.Error!void { pub fn enqueueZcu(q: *Queue, comp: *Compilation, task: ZcuTask) Allocator.Error!void {
const io = comp.io;
assert(comp.separateCodegenThreadOk()); assert(comp.separateCodegenThreadOk());
{ {
q.mutex.lock(); q.mutex.lock();
@ -208,10 +213,11 @@ pub fn enqueueZcu(q: *Queue, comp: *Compilation, task: ZcuTask) Allocator.Error!
} }
q.state = .running; q.state = .running;
} }
comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp }); comp.link_task_wait_group.async(io, flushTaskQueue, .{ q, comp });
} }
fn flushTaskQueue(tid: usize, q: *Queue, comp: *Compilation) void { fn flushTaskQueue(q: *Queue, comp: *Compilation) void {
const tid: usize = std.Io.Threaded.getCurrentThreadId();
q.flush_safety.lock(); // every `return` site should unlock this before unlocking `q.mutex` q.flush_safety.lock(); // every `return` site should unlock this before unlocking `q.mutex`
if (std.debug.runtime_safety) { if (std.debug.runtime_safety) {
q.mutex.lock(); q.mutex.lock();

View file

@ -3393,10 +3393,11 @@ pub fn updateExports(
pub fn loadInput(wasm: *Wasm, input: link.Input) !void { pub fn loadInput(wasm: *Wasm, input: link.Input) !void {
const comp = wasm.base.comp; const comp = wasm.base.comp;
const gpa = comp.gpa; const gpa = comp.gpa;
const io = comp.io;
if (comp.verbose_link) { if (comp.verbose_link) {
comp.mutex.lock(); // protect comp.arena comp.mutex.lockUncancelable(io); // protect comp.arena
defer comp.mutex.unlock(); defer comp.mutex.unlock(io);
const argv = &wasm.dump_argv_list; const argv = &wasm.dump_argv_list;
switch (input) { switch (input) {

View file

@ -11,7 +11,6 @@ const Allocator = mem.Allocator;
const Ast = std.zig.Ast; const Ast = std.zig.Ast;
const Color = std.zig.Color; const Color = std.zig.Color;
const warn = std.log.warn; const warn = std.log.warn;
const ThreadPool = std.Thread.Pool;
const cleanExit = std.process.cleanExit; const cleanExit = std.process.cleanExit;
const Cache = std.Build.Cache; const Cache = std.Build.Cache;
const Path = std.Build.Cache.Path; const Path = std.Build.Cache.Path;
@ -166,6 +165,8 @@ var debug_allocator: std.heap.DebugAllocator(.{
.stack_trace_frames = build_options.mem_leak_frames, .stack_trace_frames = build_options.mem_leak_frames,
}) = .init; }) = .init;
pub var threaded_io: Io.Threaded = undefined;
pub fn main() anyerror!void { pub fn main() anyerror!void {
const gpa, const is_debug = gpa: { const gpa, const is_debug = gpa: {
if (build_options.debug_gpa) break :gpa .{ debug_allocator.allocator(), true }; if (build_options.debug_gpa) break :gpa .{ debug_allocator.allocator(), true };
@ -247,9 +248,13 @@ fn mainArgs(gpa: Allocator, arena: Allocator, args: []const []const u8) !void {
} }
} }
var threaded: Io.Threaded = .init(gpa); threaded_io = .init(gpa);
defer threaded.deinit(); defer threaded_io.deinit();
const io = threaded.io(); if (threaded_io.getThreadCapacity()) |n| {
threaded_io.setThreadCapacity(@min(n, std.math.maxInt(Zcu.PerThread.IdBacking)));
}
threaded_io.stack_size = thread_stack_size;
const io = threaded_io.io();
const cmd = args[1]; const cmd = args[1];
const cmd_args = args[2..]; const cmd_args = args[2..];
@ -1124,15 +1129,10 @@ fn buildOutputType(
fatal("expected [auto|on|off] after --color, found '{s}'", .{next_arg}); fatal("expected [auto|on|off] after --color, found '{s}'", .{next_arg});
}; };
} else if (mem.cutPrefix(u8, arg, "-j")) |str| { } else if (mem.cutPrefix(u8, arg, "-j")) |str| {
const num = std.fmt.parseUnsigned(u32, str, 10) catch |err| { const n = std.fmt.parseUnsigned(u32, str, 10) catch |err|
fatal("unable to parse jobs count '{s}': {s}", .{ fatal("unable to parse jobs count '{s}': {t}", .{ str, err });
str, @errorName(err), if (n < 1) fatal("number of jobs must be at least 1", .{});
}); n_jobs = n;
};
if (num < 1) {
fatal("number of jobs must be at least 1\n", .{});
}
n_jobs = num;
} else if (mem.eql(u8, arg, "--subsystem")) { } else if (mem.eql(u8, arg, "--subsystem")) {
subsystem = try parseSubSystem(args_iter.nextOrFatal()); subsystem = try parseSubSystem(args_iter.nextOrFatal());
} else if (mem.eql(u8, arg, "-O")) { } else if (mem.eql(u8, arg, "-O")) {
@ -3282,14 +3282,10 @@ fn buildOutputType(
}, },
}; };
var thread_pool: ThreadPool = undefined; if (n_jobs) |n| {
try thread_pool.init(.{ assert(n >= 1);
.allocator = gpa, threaded_io.setThreadCapacity(@min(n, std.math.maxInt(Zcu.PerThread.IdBacking)));
.n_jobs = @min(@max(n_jobs orelse std.Thread.getCpuCount() catch 1, 1), std.math.maxInt(Zcu.PerThread.IdBacking)), }
.track_ids = true,
.stack_size = thread_stack_size,
});
defer thread_pool.deinit();
for (create_module.c_source_files.items) |*src| { for (create_module.c_source_files.items) |*src| {
dev.check(.c_compiler); dev.check(.c_compiler);
@ -3378,7 +3374,6 @@ fn buildOutputType(
var create_diag: Compilation.CreateDiagnostic = undefined; var create_diag: Compilation.CreateDiagnostic = undefined;
const comp = Compilation.create(gpa, arena, io, &create_diag, .{ const comp = Compilation.create(gpa, arena, io, &create_diag, .{
.dirs = dirs, .dirs = dirs,
.thread_pool = &thread_pool,
.self_exe_path = switch (native_os) { .self_exe_path = switch (native_os) {
.wasi => null, .wasi => null,
else => self_exe_path, else => self_exe_path,
@ -4956,15 +4951,10 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8)
try child_argv.appendSlice(&.{ arg, args[i] }); try child_argv.appendSlice(&.{ arg, args[i] });
continue; continue;
} else if (mem.cutPrefix(u8, arg, "-j")) |str| { } else if (mem.cutPrefix(u8, arg, "-j")) |str| {
const num = std.fmt.parseUnsigned(u32, str, 10) catch |err| { const n = std.fmt.parseUnsigned(u32, str, 10) catch |err|
fatal("unable to parse jobs count '{s}': {s}", .{ fatal("unable to parse jobs count '{s}': {t}", .{ str, err });
str, @errorName(err), if (n < 1) fatal("number of jobs must be at least 1", .{});
}); n_jobs = n;
};
if (num < 1) {
fatal("number of jobs must be at least 1\n", .{});
}
n_jobs = num;
} else if (mem.eql(u8, arg, "--seed")) { } else if (mem.eql(u8, arg, "--seed")) {
if (i + 1 >= args.len) fatal("expected argument after '{s}'", .{arg}); if (i + 1 >= args.len) fatal("expected argument after '{s}'", .{arg});
i += 1; i += 1;
@ -5049,14 +5039,10 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8)
child_argv.items[argv_index_global_cache_dir] = dirs.global_cache.path orelse cwd_path; child_argv.items[argv_index_global_cache_dir] = dirs.global_cache.path orelse cwd_path;
child_argv.items[argv_index_cache_dir] = dirs.local_cache.path orelse cwd_path; child_argv.items[argv_index_cache_dir] = dirs.local_cache.path orelse cwd_path;
var thread_pool: ThreadPool = undefined; if (n_jobs) |n| {
try thread_pool.init(.{ assert(n >= 1);
.allocator = gpa, threaded_io.setThreadCapacity(@min(n, std.math.maxInt(Zcu.PerThread.IdBacking)));
.n_jobs = @min(@max(n_jobs orelse std.Thread.getCpuCount() catch 1, 1), std.math.maxInt(Zcu.PerThread.IdBacking)), }
.track_ids = true,
.stack_size = thread_stack_size,
});
defer thread_pool.deinit();
// Dummy http client that is not actually used when fetch_command is unsupported. // Dummy http client that is not actually used when fetch_command is unsupported.
// Prevents bootstrap from depending on a bunch of unnecessary stuff. // Prevents bootstrap from depending on a bunch of unnecessary stuff.
@ -5122,8 +5108,8 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8)
defer fetch_prog_node.end(); defer fetch_prog_node.end();
var job_queue: Package.Fetch.JobQueue = .{ var job_queue: Package.Fetch.JobQueue = .{
.io = io,
.http_client = &http_client, .http_client = &http_client,
.thread_pool = &thread_pool,
.global_cache = dirs.global_cache, .global_cache = dirs.global_cache,
.read_only = false, .read_only = false,
.recursive = true, .recursive = true,
@ -5156,7 +5142,6 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8)
var fetch: Package.Fetch = .{ var fetch: Package.Fetch = .{
.arena = std.heap.ArenaAllocator.init(gpa), .arena = std.heap.ArenaAllocator.init(gpa),
.io = io,
.location = .{ .relative_path = phantom_package_root }, .location = .{ .relative_path = phantom_package_root },
.location_tok = 0, .location_tok = 0,
.hash_tok = .none, .hash_tok = .none,
@ -5190,10 +5175,8 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8)
&fetch, &fetch,
); );
job_queue.thread_pool.spawnWg(&job_queue.wait_group, Package.Fetch.workerRun, .{ job_queue.wait_group.async(io, Package.Fetch.workerRun, .{ &fetch, "root" });
&fetch, "root", job_queue.wait_group.wait(io);
});
job_queue.wait_group.wait();
try job_queue.consolidateErrors(); try job_queue.consolidateErrors();
@ -5288,7 +5271,6 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8)
.main_mod = build_mod, .main_mod = build_mod,
.emit_bin = .yes_cache, .emit_bin = .yes_cache,
.self_exe_path = self_exe_path, .self_exe_path = self_exe_path,
.thread_pool = &thread_pool,
.verbose_cc = verbose_cc, .verbose_cc = verbose_cc,
.verbose_link = verbose_link, .verbose_link = verbose_link,
.verbose_air = verbose_air, .verbose_air = verbose_air,
@ -5460,15 +5442,6 @@ fn jitCmd(
); );
defer dirs.deinit(); defer dirs.deinit();
var thread_pool: ThreadPool = undefined;
try thread_pool.init(.{
.allocator = gpa,
.n_jobs = @min(@max(std.Thread.getCpuCount() catch 1, 1), std.math.maxInt(Zcu.PerThread.IdBacking)),
.track_ids = true,
.stack_size = thread_stack_size,
});
defer thread_pool.deinit();
var child_argv: std.ArrayListUnmanaged([]const u8) = .empty; var child_argv: std.ArrayListUnmanaged([]const u8) = .empty;
try child_argv.ensureUnusedCapacity(arena, args.len + 4); try child_argv.ensureUnusedCapacity(arena, args.len + 4);
@ -5531,7 +5504,6 @@ fn jitCmd(
.main_mod = root_mod, .main_mod = root_mod,
.emit_bin = .yes_cache, .emit_bin = .yes_cache,
.self_exe_path = self_exe_path, .self_exe_path = self_exe_path,
.thread_pool = &thread_pool,
.cache_mode = .whole, .cache_mode = .whole,
}) catch |err| switch (err) { }) catch |err| switch (err) {
error.CreateFail => fatal("failed to create compilation: {f}", .{create_diag}), error.CreateFail => fatal("failed to create compilation: {f}", .{create_diag}),
@ -6875,10 +6847,6 @@ fn cmdFetch(
const path_or_url = opt_path_or_url orelse fatal("missing url or path parameter", .{}); const path_or_url = opt_path_or_url orelse fatal("missing url or path parameter", .{});
var thread_pool: ThreadPool = undefined;
try thread_pool.init(.{ .allocator = gpa });
defer thread_pool.deinit();
var http_client: std.http.Client = .{ .allocator = gpa, .io = io }; var http_client: std.http.Client = .{ .allocator = gpa, .io = io };
defer http_client.deinit(); defer http_client.deinit();
@ -6899,8 +6867,8 @@ fn cmdFetch(
defer global_cache_directory.handle.close(); defer global_cache_directory.handle.close();
var job_queue: Package.Fetch.JobQueue = .{ var job_queue: Package.Fetch.JobQueue = .{
.io = io,
.http_client = &http_client, .http_client = &http_client,
.thread_pool = &thread_pool,
.global_cache = global_cache_directory, .global_cache = global_cache_directory,
.recursive = false, .recursive = false,
.read_only = false, .read_only = false,
@ -6912,7 +6880,6 @@ fn cmdFetch(
var fetch: Package.Fetch = .{ var fetch: Package.Fetch = .{
.arena = std.heap.ArenaAllocator.init(gpa), .arena = std.heap.ArenaAllocator.init(gpa),
.io = io,
.location = .{ .path_or_url = path_or_url }, .location = .{ .path_or_url = path_or_url },
.location_tok = 0, .location_tok = 0,
.hash_tok = .none, .hash_tok = .none,