From a239e65b54a5030310ce721593a94a6289b625fe Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 1 Jul 2024 15:32:46 -0700 Subject: [PATCH] std.Thread.Pool: call shutdown on workers blocking on connect or read and increase kernel backlog to the maximum number. Without increasing the kernel backlog to the maximum number, I observed connect() to block indefinitely, even when another thread calls shutdown() on that socket file descriptor. --- lib/std/Thread/Pool.zig | 135 +++++++++++++++++++++++++--------------- 1 file changed, 84 insertions(+), 51 deletions(-) diff --git a/lib/std/Thread/Pool.zig b/lib/std/Thread/Pool.zig index c9bf164cc4..20f1231fab 100644 --- a/lib/std/Thread/Pool.zig +++ b/lib/std/Thread/Pool.zig @@ -9,7 +9,7 @@ cond: std.Thread.Condition, run_queue: RunQueue, end_flag: bool, allocator: std.mem.Allocator, -threads: []std.Thread, +workers: []Worker, job_server_options: Options.JobServer, job_server: ?*JobServer, @@ -20,6 +20,20 @@ const Runnable = struct { const RunProto = *const fn (*Runnable) void; +pub const Worker = struct { + thread: std.Thread, + /// This data is shared with the thread pool for deinitialization purposes: + /// calling shutdown on this file descriptor will wake up any workers + /// blocking on acquiring a thread token. + /// + /// Pointers to file descriptors are used so that the actual loads and + /// stores of file descriptors take place in thread-local data, avoiding + /// false sharing. + /// + /// Protected by the pool mutex. + connection: *const std.posix.fd_t, +}; + pub const Options = struct { /// Not required to be thread-safe; protected by the pool's mutex. allocator: std.mem.Allocator, @@ -59,7 +73,7 @@ pub fn init(pool: *Pool, options: Options) !void { .run_queue = .{}, .end_flag = false, .allocator = allocator, - .threads = &.{}, + .workers = &.{}, .job_server_options = options.job_server, .job_server = null, }; @@ -71,19 +85,26 @@ pub fn init(pool: *Pool, options: Options) !void { assert(thread_count > 0); // Kill and join any threads we spawned and free memory on error. - pool.threads = try allocator.alloc(std.Thread, thread_count); + pool.workers = try allocator.alloc(Worker, thread_count); var spawned: usize = 0; errdefer pool.join(spawned); - for (pool.threads) |*thread| { - thread.* = try std.Thread.spawn(.{}, worker, .{pool}); + const temporary_connection_memory: std.posix.fd_t = -1; + + for (pool.workers) |*worker| { + worker.* = .{ + .connection = &temporary_connection_memory, + .thread = try std.Thread.spawn(.{}, workerRun, .{ pool, spawned }), + }; spawned += 1; } switch (options.job_server) { .abstain, .connect => {}, .host => |addr| { - var server = try addr.listen(.{}); + var server = try addr.listen(.{ + .kernel_backlog = std.math.maxInt(u31), + }); errdefer server.deinit(); const pollfds = try allocator.alloc(std.posix.pollfd, thread_count + 1); @@ -104,7 +125,7 @@ pub fn init(pool: *Pool, options: Options) !void { } pub fn deinit(pool: *Pool) void { - pool.join(pool.threads.len); + pool.join(pool.workers.len); pool.* = undefined; } @@ -118,6 +139,12 @@ fn join(pool: *Pool, spawned: usize) void { // Ensure future worker threads exit the dequeue loop. pool.end_flag = true; + + // Wake up any workers blocking on connect or read. + for (pool.workers[0..spawned]) |worker| { + const fd = worker.connection.*; + if (fd >= 0) std.posix.shutdown(fd, .both) catch {}; + } } // Wake up any sleeping threads (this can be done outside the mutex) then @@ -132,10 +159,10 @@ fn join(pool: *Pool, spawned: usize) void { job_server.thread.join(); } - for (pool.threads[0..spawned]) |thread| - thread.join(); + for (pool.workers[0..spawned]) |worker| + worker.thread.join(); - pool.allocator.free(pool.threads); + pool.allocator.free(pool.workers); } pub const JobServer = struct { @@ -310,36 +337,6 @@ pub fn spawn(pool: *Pool, comptime func: anytype, args: anytype) void { pool.cond.signal(); } -fn acquireThreadToken(job_server_options: Options.JobServer, fd_ptr: *std.posix.fd_t) void { - if (fd_ptr.* >= 0) return; - - switch (job_server_options) { - .abstain => {}, - .connect, .host => |addr| { - const sockfd = std.posix.socket( - std.posix.AF.UNIX, - std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC, - 0, - ) catch |err| { - std.log.debug("failed to make socket: {s}", .{@errorName(err)}); - return; - }; - fd_ptr.* = sockfd; - - std.posix.connect(sockfd, &addr.any, addr.getOsSockLen()) catch |err| { - std.log.debug("failed to connect: {s}", .{@errorName(err)}); - return; - }; - - var trash_buf: [1]u8 = undefined; - _ = std.posix.read(sockfd, &trash_buf) catch |err| { - std.log.debug("failed to read: {s}", .{@errorName(err)}); - return; - }; - }, - } -} - fn releaseThreadToken(fd_ptr: *std.posix.fd_t) void { const fd = fd_ptr.*; if (fd >= 0) { @@ -348,29 +345,65 @@ fn releaseThreadToken(fd_ptr: *std.posix.fd_t) void { } } -fn worker(pool: *Pool) void { +fn workerRun(pool: *Pool, worker_index: usize) void { var connection: std.posix.fd_t = -1; - defer releaseThreadToken(&connection); pool.mutex.lock(); - defer pool.mutex.unlock(); + pool.workers[worker_index].connection = &connection; - while (true) { - const work_available = pool.run_queue.first != null; - if (work_available) { - pool.mutex.unlock(); - defer pool.mutex.lock(); - acquireThreadToken(pool.job_server_options, &connection); + while (!pool.end_flag) { + if (connection == -1 and pool.run_queue.first != null) token: { + switch (pool.job_server_options) { + .abstain => {}, + .connect, .host => |addr| { + pool.mutex.unlock(); + const sockfd = std.posix.socket( + std.posix.AF.UNIX, + std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC, + 0, + ) catch |err| { + std.log.debug("failed to make socket: {s}", .{@errorName(err)}); + pool.mutex.lock(); + if (pool.end_flag) break; + break :token; + }; + pool.mutex.lock(); + connection = sockfd; + if (pool.end_flag) break; + pool.mutex.unlock(); + + std.posix.connect(sockfd, &addr.any, addr.getOsSockLen()) catch |err| { + std.log.debug("failed to connect: {s}", .{@errorName(err)}); + pool.mutex.lock(); + if (pool.end_flag) break; + break :token; + }; + + var trash_buf: [1]u8 = undefined; + _ = std.posix.read(sockfd, &trash_buf) catch |err| { + std.log.debug("failed to read: {s}", .{@errorName(err)}); + pool.mutex.lock(); + if (pool.end_flag) break; + break :token; + }; + + pool.mutex.lock(); + if (pool.end_flag) break; + }, + } } while (pool.run_queue.popFirst()) |run_node| { pool.mutex.unlock(); - defer pool.mutex.lock(); run_node.data.runFn(&run_node.data); + pool.mutex.lock(); + if (pool.end_flag) break; } - if (pool.end_flag) return; releaseThreadToken(&connection); pool.cond.wait(&pool.mutex); } + + releaseThreadToken(&connection); + pool.mutex.unlock(); } pub fn waitAndWork(pool: *Pool, wait_group: *WaitGroup) void {