From dc207da184087eec7693fbf17c412c4b67c2080c Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 21 Jun 2024 11:03:00 -0700 Subject: [PATCH] std.Thread.Pool: don't hold the job closure while blocking on IPC read --- lib/std/Thread/Pool.zig | 91 ++++++++++++++++++++++------------------- 1 file changed, 49 insertions(+), 42 deletions(-) diff --git a/lib/std/Thread/Pool.zig b/lib/std/Thread/Pool.zig index 02ce6f2d70..c9bf164cc4 100644 --- a/lib/std/Thread/Pool.zig +++ b/lib/std/Thread/Pool.zig @@ -310,58 +310,65 @@ pub fn spawn(pool: *Pool, comptime func: anytype, args: anytype) void { pool.cond.signal(); } +fn acquireThreadToken(job_server_options: Options.JobServer, fd_ptr: *std.posix.fd_t) void { + if (fd_ptr.* >= 0) return; + + switch (job_server_options) { + .abstain => {}, + .connect, .host => |addr| { + const sockfd = std.posix.socket( + std.posix.AF.UNIX, + std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC, + 0, + ) catch |err| { + std.log.debug("failed to make socket: {s}", .{@errorName(err)}); + return; + }; + fd_ptr.* = sockfd; + + std.posix.connect(sockfd, &addr.any, addr.getOsSockLen()) catch |err| { + std.log.debug("failed to connect: {s}", .{@errorName(err)}); + return; + }; + + var trash_buf: [1]u8 = undefined; + _ = std.posix.read(sockfd, &trash_buf) catch |err| { + std.log.debug("failed to read: {s}", .{@errorName(err)}); + return; + }; + }, + } +} + +fn releaseThreadToken(fd_ptr: *std.posix.fd_t) void { + const fd = fd_ptr.*; + if (fd >= 0) { + std.posix.close(fd); + fd_ptr.* = -1; + } +} + fn worker(pool: *Pool) void { - var trash_buf: [1]u8 = undefined; - var connection: ?std.posix.fd_t = null; - defer if (connection) |fd| std.posix.close(fd); + var connection: std.posix.fd_t = -1; + defer releaseThreadToken(&connection); pool.mutex.lock(); defer pool.mutex.unlock(); while (true) { - while (pool.run_queue.popFirst()) |run_node| { - // Temporarily unlock the mutex in order to execute the run_node. + const work_available = pool.run_queue.first != null; + if (work_available) { pool.mutex.unlock(); defer pool.mutex.lock(); - - if (connection == null) switch (pool.job_server_options) { - .abstain => {}, - .connect, .host => |addr| lock: { - const sockfd = std.posix.socket( - std.posix.AF.UNIX, - std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC, - 0, - ) catch |err| { - std.log.debug("failed to make socket: {s}", .{@errorName(err)}); - break :lock; - }; - connection = sockfd; - - std.posix.connect(sockfd, &addr.any, addr.getOsSockLen()) catch |err| { - std.log.debug("failed to connect: {s}", .{@errorName(err)}); - break :lock; - }; - - _ = std.posix.read(sockfd, &trash_buf) catch |err| { - std.log.debug("failed to read: {s}", .{@errorName(err)}); - break :lock; - }; - }, - }; - - const runFn = run_node.data.runFn; - runFn(&run_node.data); + acquireThreadToken(pool.job_server_options, &connection); } - - // Stop executing instead of waiting if the thread pool is no longer running. - if (pool.end_flag) - break; - - if (connection) |fd| { - std.posix.close(fd); - connection = null; + while (pool.run_queue.popFirst()) |run_node| { + pool.mutex.unlock(); + defer pool.mutex.lock(); + run_node.data.runFn(&run_node.data); } - + if (pool.end_flag) return; + releaseThreadToken(&connection); pool.cond.wait(&pool.mutex); } }