From 7d4e06b6d9c3b530505f62ecac98adb23d39b07b Mon Sep 17 00:00:00 2001 From: Matthew Lugg Date: Fri, 5 Dec 2025 11:46:13 +0000 Subject: [PATCH] DO NOT MERGE: jobserver usage example with `std.Thread.Pool` Co-authored-by: Jacob Young --- lib/std/Thread/Pool.zig | 11 +++++++++++ server.zig | 32 ++++++++++++++++++++++++++++++++ worker.zig | 41 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 84 insertions(+) create mode 100644 server.zig create mode 100644 worker.zig diff --git a/lib/std/Thread/Pool.zig b/lib/std/Thread/Pool.zig index e836665d70..bf9ab01955 100644 --- a/lib/std/Thread/Pool.zig +++ b/lib/std/Thread/Pool.zig @@ -15,6 +15,7 @@ ids: if (builtin.single_threaded) struct { return 0; } } else std.AutoArrayHashMapUnmanaged(std.Thread.Id, void), +job_client: ?*std.job.Client, const Runnable = struct { runFn: RunProto, @@ -28,6 +29,7 @@ pub const Options = struct { n_jobs: ?usize = null, track_ids: bool = false, stack_size: usize = std.Thread.SpawnConfig.default_stack_size, + job_client: ?*std.job.Client = null, }; pub fn init(pool: *Pool, options: Options) !void { @@ -37,6 +39,7 @@ pub fn init(pool: *Pool, options: Options) !void { .allocator = allocator, .threads = if (builtin.single_threaded) .{} else &.{}, .ids = .{}, + .job_client = options.job_client, }; if (builtin.single_threaded) { @@ -284,6 +287,13 @@ fn worker(pool: *Pool) void { if (id) |_| pool.ids.putAssumeCapacityNoClobber(std.Thread.getCurrentId(), {}); while (true) { + const job_token: ?std.job.Client.Token = if (pool.job_client) |job_client| token: { + pool.mutex.unlock(); + defer pool.mutex.lock(); + break :token job_client.acquire() catch |err| { + std.debug.panic("failed to acquire job token: {t}", .{err}); + }; + } else null; while (pool.run_queue.popFirst()) |run_node| { // Temporarily unlock the mutex in order to execute the run_node pool.mutex.unlock(); @@ -292,6 +302,7 @@ fn worker(pool: *Pool) void { const runnable: *Runnable = @fieldParentPtr("node", run_node); runnable.runFn(runnable, id); } + if (job_token) |t| t.release(); // Stop executing instead of waiting if the thread pool is no longer running. if (pool.is_running) { diff --git a/server.zig b/server.zig new file mode 100644 index 0000000000..ab1e948f43 --- /dev/null +++ b/server.zig @@ -0,0 +1,32 @@ +const num_children = 25; + +pub fn main() !void { + var arena_instance: std.heap.ArenaAllocator = .init(std.heap.page_allocator); + defer arena_instance.deinit(); + const arena = arena_instance.allocator(); + + const cpu_count = try std.Thread.getCpuCount(); + var env = try std.process.getEnvMap(arena); + + var job_server: std.job.Server = try .init(arena, @intCast(cpu_count), &env); + defer job_server.deinit(); + + var children: [num_children]std.process.Child = undefined; + for (&children, 0..) |*c, child_num| { + const child_num_str = try std.fmt.allocPrint(arena, "{d}", .{child_num}); + const argv = try arena.dupe([]const u8, &.{ switch (builtin.os.tag) { + else => "./worker", + .windows => ".\\worker.exe", + }, child_num_str }); + c.* = .init(argv, arena); + c.env_map = &env; + } + + std.log.info("Spawning {d} workers on {d} CPUs", .{ children.len, cpu_count }); + for (&children) |*c| try c.spawn(); + for (&children) |*c| _ = try c.wait(); + std.log.info("All {d} workers exited", .{children.len}); +} + +const builtin = @import("builtin"); +const std = @import("std"); diff --git a/worker.zig b/worker.zig new file mode 100644 index 0000000000..f95aa957d7 --- /dev/null +++ b/worker.zig @@ -0,0 +1,41 @@ +const num_tasks = 25; + +pub fn main() !void { + var arena_instance: std.heap.ArenaAllocator = .init(std.heap.page_allocator); + defer arena_instance.deinit(); + const arena = arena_instance.allocator(); + + const args = try std.process.argsAlloc(arena); + const env = try std.process.getEnvMap(arena); + + const process_index = try std.fmt.parseInt(usize, args[1], 10); + + var job_client: std.job.Client = try .init(arena, &env); + defer job_client.deinit(); + + var thread_pool: std.Thread.Pool = undefined; + try thread_pool.init(.{ .allocator = arena, .job_client = &job_client }); + defer thread_pool.deinit(); + + // Spawn a bunch of tasks with a variable amount of CPU-intensive work + var rng: std.Random.DefaultPrng = .init(process_index); + const r = rng.random(); + for (0..num_tasks) |_| { + try thread_pool.spawn(doWork, .{ process_index, r.intRangeAtMost(u32, 300_000, 1_000_000) }); + } +} + +fn doWork( + process_index: usize, + work_amount: u32, +) void { + std.log.info("[process {d}] start", .{process_index}); + defer std.log.info("[process {d}] stop", .{process_index}); + + // Badly simulate single-threaded CPU-intensive work + for (0..work_amount) |_| { + for (0..1000) |_| asm volatile ("nop"); + } +} + +const std = @import("std");