DO NOT MERGE: jobserver usage example with std.Thread.Pool

Co-authored-by: Jacob Young <jacobly0@users.noreply.github.com>
This commit is contained in:
Matthew Lugg 2025-12-05 11:46:13 +00:00
parent 5997ed2dd5
commit 7d4e06b6d9
No known key found for this signature in database
GPG key ID: 3F5B7DCCBF4AF02E
3 changed files with 84 additions and 0 deletions

View file

@ -15,6 +15,7 @@ ids: if (builtin.single_threaded) struct {
return 0;
}
} else std.AutoArrayHashMapUnmanaged(std.Thread.Id, void),
job_client: ?*std.job.Client,
const Runnable = struct {
runFn: RunProto,
@ -28,6 +29,7 @@ pub const Options = struct {
n_jobs: ?usize = null,
track_ids: bool = false,
stack_size: usize = std.Thread.SpawnConfig.default_stack_size,
job_client: ?*std.job.Client = null,
};
pub fn init(pool: *Pool, options: Options) !void {
@ -37,6 +39,7 @@ pub fn init(pool: *Pool, options: Options) !void {
.allocator = allocator,
.threads = if (builtin.single_threaded) .{} else &.{},
.ids = .{},
.job_client = options.job_client,
};
if (builtin.single_threaded) {
@ -284,6 +287,13 @@ fn worker(pool: *Pool) void {
if (id) |_| pool.ids.putAssumeCapacityNoClobber(std.Thread.getCurrentId(), {});
while (true) {
const job_token: ?std.job.Client.Token = if (pool.job_client) |job_client| token: {
pool.mutex.unlock();
defer pool.mutex.lock();
break :token job_client.acquire() catch |err| {
std.debug.panic("failed to acquire job token: {t}", .{err});
};
} else null;
while (pool.run_queue.popFirst()) |run_node| {
// Temporarily unlock the mutex in order to execute the run_node
pool.mutex.unlock();
@ -292,6 +302,7 @@ fn worker(pool: *Pool) void {
const runnable: *Runnable = @fieldParentPtr("node", run_node);
runnable.runFn(runnable, id);
}
if (job_token) |t| t.release();
// Stop executing instead of waiting if the thread pool is no longer running.
if (pool.is_running) {

32
server.zig Normal file
View file

@ -0,0 +1,32 @@
const num_children = 25;
pub fn main() !void {
var arena_instance: std.heap.ArenaAllocator = .init(std.heap.page_allocator);
defer arena_instance.deinit();
const arena = arena_instance.allocator();
const cpu_count = try std.Thread.getCpuCount();
var env = try std.process.getEnvMap(arena);
var job_server: std.job.Server = try .init(arena, @intCast(cpu_count), &env);
defer job_server.deinit();
var children: [num_children]std.process.Child = undefined;
for (&children, 0..) |*c, child_num| {
const child_num_str = try std.fmt.allocPrint(arena, "{d}", .{child_num});
const argv = try arena.dupe([]const u8, &.{ switch (builtin.os.tag) {
else => "./worker",
.windows => ".\\worker.exe",
}, child_num_str });
c.* = .init(argv, arena);
c.env_map = &env;
}
std.log.info("Spawning {d} workers on {d} CPUs", .{ children.len, cpu_count });
for (&children) |*c| try c.spawn();
for (&children) |*c| _ = try c.wait();
std.log.info("All {d} workers exited", .{children.len});
}
const builtin = @import("builtin");
const std = @import("std");

41
worker.zig Normal file
View file

@ -0,0 +1,41 @@
const num_tasks = 25;
pub fn main() !void {
var arena_instance: std.heap.ArenaAllocator = .init(std.heap.page_allocator);
defer arena_instance.deinit();
const arena = arena_instance.allocator();
const args = try std.process.argsAlloc(arena);
const env = try std.process.getEnvMap(arena);
const process_index = try std.fmt.parseInt(usize, args[1], 10);
var job_client: std.job.Client = try .init(arena, &env);
defer job_client.deinit();
var thread_pool: std.Thread.Pool = undefined;
try thread_pool.init(.{ .allocator = arena, .job_client = &job_client });
defer thread_pool.deinit();
// Spawn a bunch of tasks with a variable amount of CPU-intensive work
var rng: std.Random.DefaultPrng = .init(process_index);
const r = rng.random();
for (0..num_tasks) |_| {
try thread_pool.spawn(doWork, .{ process_index, r.intRangeAtMost(u32, 300_000, 1_000_000) });
}
}
fn doWork(
process_index: usize,
work_amount: u32,
) void {
std.log.info("[process {d}] start", .{process_index});
defer std.log.info("[process {d}] stop", .{process_index});
// Badly simulate single-threaded CPU-intensive work
for (0..work_amount) |_| {
for (0..1000) |_| asm volatile ("nop");
}
}
const std = @import("std");