diff --git a/CMakeLists.txt b/CMakeLists.txt index 7090f88527..ea1cd1bad6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -449,6 +449,9 @@ set(ZIG_STAGE2_SOURCES lib/std/heap.zig lib/std/heap/arena_allocator.zig lib/std/json.zig + lib/std/job.zig + lib/std/job/Client.zig + lib/std/job/Server.zig lib/std/leb128.zig lib/std/log.zig lib/std/macho.zig diff --git a/lib/std/job.zig b/lib/std/job.zig new file mode 100644 index 0000000000..0b1300fece --- /dev/null +++ b/lib/std/job.zig @@ -0,0 +1,116 @@ +//! This namespace provides an implementation of the Robust Jobserver protocol: +//! https://codeberg.org/mlugg/robust-jobserver/ +//! +//! `Client` and `Server` currently both support the `sysvsem` and `win32pipe` +//! communication methods, meaning this implementation is usable on most POSIX +//! targets and on Windows. + +pub const Client = @import("job/Client.zig"); +pub const Server = @import("job/Server.zig"); + +pub const Method = enum { sysvsem, win32pipe }; + +pub const sysv_sem = struct { + pub const supported = switch (builtin.os.tag) { + .linux, + .illumos, + .haiku, + + .freebsd, + .netbsd, + .openbsd, + .dragonfly, + + .driverkit, + .ios, + .maccatalyst, + .macos, + .tvos, + .visionos, + .watchos, + => true, + + else => false, + }; + + /// `semget(IPC_PRIVATE, 1, 0o777)` + pub fn create() error{ SystemResources, Unexpected }!i32 { + const res = system.create(); + switch (std.posix.errno(res)) { + .SUCCESS => return @intCast(res), + .NOMEM => return error.SystemResources, + .NOSPC => return error.SystemResources, + else => |e| return std.posix.unexpectedErrno(e), + } + } + /// `semctl(id, 0, SETVAL, n)` + pub fn setValue(id: i32, n: u32) error{Unexpected}!void { + switch (std.posix.errno(system.setValue(id, n))) { + .SUCCESS => return, + else => |e| return std.posix.unexpectedErrno(e), + } + } + /// `semop(id, &.{.{ .sem_num = 0, .sem_op = delta, .sem_flg = SEM_UNDO }})` + pub fn modify(id: i32, delta: i16) error{ + InvalidSemaphore, + AccessDenied, + SystemResources, + /// A signal interrupted a blocked call to `modify`. + /// This allows the caller to implement cancelation. + Interrupted, + Unexpected, + }!void { + while (true) { + switch (std.posix.errno(system.modify(id, delta))) { + .SUCCESS => return, + .ACCES => return error.AccessDenied, + .FBIG => return error.InvalidSemaphore, + .IDRM => return error.InvalidSemaphore, + .INTR => return error.Interrupted, + .INVAL => return error.InvalidSemaphore, + .NOMEM => return error.SystemResources, + .RANGE => return error.InvalidSemaphore, + else => |e| return std.posix.unexpectedErrno(e), + } + } + } + + const system = if (builtin.link_libc) struct { + fn create() c_int { + return std.c.semget(.IPC_PRIVATE, 1, 0o777); + } + fn setValue(id: i32, n: u32) c_int { + return std.c.semctl(id, 0, std.posix.SETVAL, n); + } + fn modify(id: i32, delta: i16) c_int { + var ops: [1]std.posix.sembuf = .{.{ + .sem_num = 0, + .sem_op = delta, + .sem_flg = std.posix.SEM_UNDO, + }}; + return std.c.semop(id, &ops, ops.len); + } + } else switch (builtin.os.tag) { + .linux => struct { + fn create() usize { + const key: std.posix.key_t = .IPC_PRIVATE; + return std.os.linux.syscall3(.semget, @intFromEnum(key), 1, 0o777); + } + fn setValue(id: i32, n: u32) usize { + return std.os.linux.syscall4(.semctl, @intCast(id), 0, std.posix.SETVAL, n); + } + fn modify(id: i32, delta: i16) usize { + var ops: [1]std.posix.sembuf = .{.{ + .sem_num = 0, + .sem_op = delta, + .sem_flg = std.posix.SEM_UNDO, + }}; + return std.os.linux.syscall3(.semop, @intCast(id), @intFromPtr(&ops), ops.len); + } + }, + else => unreachable, + }; +}; + +const builtin = @import("builtin"); +const std = @import("std.zig"); diff --git a/lib/std/job/Client.zig b/lib/std/job/Client.zig new file mode 100644 index 0000000000..92ae649161 --- /dev/null +++ b/lib/std/job/Client.zig @@ -0,0 +1,196 @@ +impl: Impl, + +pub const InitError = error{ + OutOfMemory, + /// There is no advertised job server. + NoServer, + /// The job server is advertising a communication method which is not known. + UnknownMethod, + /// The job server is advertising a communication method which is known but unsupported. + UnsupportedMethod, + /// A job server advertisement exists, but is malformed. + InvalidArgument, + /// The job server has shut down or is otherwise not available to connect to. + ServerFailed, + /// This process does not have permission to access the job server. + AccessDenied, +}; +pub fn init(arena: Allocator, env: *const std.process.EnvMap) InitError!Client { + const env_val = env.get("ROBUST_JOBSERVER") orelse return error.NoServer; + const idx = std.mem.findScalar(u8, env_val, ':') orelse return error.InvalidArgument; + const method = std.meta.stringToEnum(job.Method, env_val[0..idx]) orelse return error.UnknownMethod; + switch (method) { + inline else => |m| { + const ImplTy = @FieldType(Impl, @tagName(m)); + if (ImplTy == noreturn) return error.UnsupportedMethod; + return .{ .impl = @unionInit( + Impl, + @tagName(m), + try .init(arena, env_val[idx + 1 ..]), + ) }; + }, + } +} +pub fn deinit(c: *Client) void { + switch (c.impl) { + inline else => |*x| x.deinit(), + } + c.* = undefined; +} + +pub const AcquireError = error{ + /// The job server has shut down or is otherwise not available to connect to. + ServerFailed, + /// This process does not have permission to access the job server. + AccessDenied, + /// Insufficient resources are available to acquire a token. + SystemResources, + Unexpected, +}; +pub fn acquire(c: *const Client) AcquireError!Token { + return switch (c.impl) { + inline else => |*impl| impl.acquire(), + }; +} + +const Impl = union(job.Method) { + sysvsem: if (job.sysv_sem.supported) SysVSem else noreturn, + win32pipe: if (builtin.target.os.tag == .windows) Win32Pipe else noreturn, +}; + +pub const Token = union(job.Method) { + sysvsem: if (job.sysv_sem.supported) SysVSem else noreturn, + win32pipe: if (builtin.target.os.tag == .windows) windows.HANDLE else noreturn, + pub fn release(t: Token) void { + switch (t) { + .sysvsem => |sem| while (true) { + return job.sysv_sem.modify(sem.set_id, 1) catch |err| switch (err) { + error.AccessDenied, error.InvalidSemaphore => { + // The semaphore broke somehow, but that's not our problem! + // (...at least, not until we next call `acquire`.) + }, + error.SystemResources => unreachable, // the undo structure was already allocated in `acquire` + error.Interrupted => continue, // releasing can't block; just retry + error.Unexpected => {}, // already warned, nothing more we can do + }; + }, + .win32pipe => |handle| _ = windows.ntdll.NtClose(handle), + } + } +}; + +const SysVSem = struct { + set_id: i32, + fn init(arena: Allocator, arg: []const u8) InitError!SysVSem { + _ = arena; + const set_id = std.fmt.parseInt(i32, arg, 10) catch return error.InvalidArgument; + return .{ .set_id = set_id }; + } + fn deinit(sem: SysVSem) void { + _ = sem; + } + fn acquire(sem: SysVSem) AcquireError!Token { + while (true) { + break job.sysv_sem.modify(sem.set_id, -1) catch |err| switch (err) { + error.InvalidSemaphore => return error.ServerFailed, + error.Interrupted => continue, // TODO: support cancelation + error.AccessDenied, error.SystemResources, error.Unexpected => |e| return e, + }; + } + return .{ .sysvsem = sem }; + } +}; + +const Win32Pipe = struct { + pipe_device: windows.HANDLE, + pipe_path: [:0]const u16, + fn init(arena: Allocator, arg: []const u8) InitError!Win32Pipe { + if (arg.len == 0) return error.InvalidArgument; + if (std.mem.findAny(u8, arg, "\\/\x00") != null) return error.InvalidArgument; + const pipe_path = std.unicode.wtf8ToWtf16LeAllocZ( + arena, + try std.fmt.allocPrint(arena, "\\??\\pipe\\{s}", .{arg}), + ) catch |err| switch (err) { + error.InvalidWtf8 => return error.InvalidArgument, + error.OutOfMemory => |e| return e, + }; + const pipe_device = windows.OpenFile( + std.unicode.wtf8ToWtf16LeStringLiteral("\\??\\pipe\\"), + .{ + .access_mask = windows.SYNCHRONIZE | windows.FILE_READ_ATTRIBUTES, + .share_access = windows.FILE_SHARE_READ | windows.FILE_SHARE_WRITE, + .creation = windows.FILE_OPEN, + .case_sensitive = false, + }, + ) catch |err| { + // This fixed path should always be accessible on Windows. + std.debug.panic("unexpected error opening '\\??\\pipe\\': {t}", .{err}); + }; + errdefer _ = windows.ntdll.NtClose(pipe_device); + return .{ + .pipe_device = pipe_device, + .pipe_path = pipe_path, + }; + } + fn deinit(wp: *const Win32Pipe) void { + _ = windows.ntdll.NtClose(wp.pipe_device); + } + fn acquire(wp: *const Win32Pipe) AcquireError!Token { + const pipe_basename_offset = std.unicode.wtf8ToWtf16LeStringLiteral("\\??\\pipe\\").len; + const handle = while (true) { + if (windows.OpenFile(wp.pipe_path, .{ + .access_mask = windows.SYNCHRONIZE, + .creation = windows.FILE_OPEN, + .share_access = 0, + .case_sensitive = false, + })) |handle| { + return .{ .win32pipe = handle }; + } else |err| switch (err) { + error.PipeBusy, error.NoDevice => {}, + + error.IsDir, + error.FileNotFound, + error.NameTooLong, + error.AntivirusInterference, + error.BadPathName, + => return error.ServerFailed, + + error.AccessDenied, + error.Unexpected, + => |e| return e, + + error.NotDir => unreachable, // we're not opening as a directory + error.PathAlreadyExists => unreachable, // we're not trying to create the path + error.WouldBlock => unreachable, // we're not using overlapped I/O + error.NetworkNotFound => unreachable, // we're not accessing a network device + } + const fpwfb: windows.FILE_PIPE_WAIT_FOR_BUFFER = .init( + wp.pipe_path[pipe_basename_offset..], + windows.FILE_PIPE_WAIT_FOR_BUFFER.WAIT_FOREVER, + ); + windows.DeviceIoControl( + wp.pipe_device, + windows.FSCTL.PIPE.WAIT, + .{ .in = fpwfb.toBuffer() }, + ) catch |err| switch (err) { + error.UnrecognizedVolume => unreachable, // not a volume + error.Pending => unreachable, // not using overlapped I/O + error.PipeClosing => return error.ServerFailed, + error.PipeAlreadyConnected => unreachable, + error.PipeAlreadyListening => unreachable, + error.Unexpected, error.AccessDenied => |e| return e, + }; + continue; + }; + return .{ .win32pipe = handle }; + } +}; + +const builtin = @import("builtin"); + +const std = @import("../std.zig"); +const Allocator = std.mem.Allocator; +const job = std.job; +const windows = std.os.windows; + +const Client = @This(); diff --git a/lib/std/job/Server.zig b/lib/std/job/Server.zig new file mode 100644 index 0000000000..6586135268 --- /dev/null +++ b/lib/std/job/Server.zig @@ -0,0 +1,282 @@ +/// `null` means we are inheriting a jobserver instance from a parent process. +impl: ?Impl, + +pub const InitError = error{ OutOfMemory, SystemResources, Unexpected }; +pub fn init(arena: Allocator, job_limit: u32, env: *std.process.EnvMap) InitError!Server { + assert(job_limit > 0); + if (env.get("ROBUST_JOBSERVER") != null) { + return .{ .impl = null }; + } + const method: job.Method = switch (builtin.target.os.tag) { + .windows => .win32pipe, + else => .sysvsem, + }; + const impl: @FieldType(Impl, @tagName(method)) = try .init(arena, job_limit); + const env_val = try std.fmt.allocPrint(arena, "{t}:{f}", .{ method, std.fmt.alt(impl, .formatEnvData) }); + try env.put("ROBUST_JOBSERVER", env_val); + return .{ .impl = @unionInit(Impl, @tagName(method), impl) }; +} +pub fn deinit(s: *Server) void { + if (s.impl) |*impl| { + switch (impl.*) { + inline else => |*x| x.deinit(), + } + } + s.* = undefined; +} + +const Impl = union(job.Method) { + sysvsem: if (job.sysv_sem.supported) SysVSem else noreturn, + win32pipe: if (builtin.target.os.tag == .windows) Win32Pipe else noreturn, +}; + +const SysVSem = struct { + set_id: i32, + fn init(arena: Allocator, num_tokens: u32) InitError!SysVSem { + _ = arena; + const id = try job.sysv_sem.create(); + try job.sysv_sem.setValue(id, num_tokens); + return .{ .set_id = id }; + } + fn deinit(sem: SysVSem) void { + _ = sem; + } + pub fn formatEnvData(sem: SysVSem, w: *std.Io.Writer) std.Io.Writer.Error!void { + try w.print("{d}", .{sem.set_id}); + } +}; + +const Win32Pipe = struct { + pipe_name: []const u8, + done_event: windows.HANDLE, + thread: std.Thread, + + const Token = struct { + handle: windows.HANDLE, + iosb: windows.IO_STATUS_BLOCK, + dummy_read_buf: [1]u8, + }; + + var pipe_name_counter: std.atomic.Value(u32) = .init(0); + fn init(arena: Allocator, num_tokens: u32) InitError!Win32Pipe { + const pipe_name = try std.fmt.allocPrint(arena, "zig-jobserver-{d}-{x}", .{ + windows.GetCurrentProcessId(), + std.crypto.random.int(u64), + }); + + const nt_path = std.unicode.wtf8ToWtf16LeAllocZ( + arena, + try std.fmt.allocPrint(arena, "\\??\\pipe\\{s}", .{pipe_name}), + ) catch |err| switch (err) { + error.InvalidWtf8 => unreachable, + error.OutOfMemory => |e| return e, + }; + + const tokens = try arena.alloc(Token, num_tokens); + @memset(tokens, .{ + .handle = windows.INVALID_HANDLE_VALUE, + .iosb = undefined, + .dummy_read_buf = undefined, + }); + errdefer for (tokens) |*token| { + if (token.handle != windows.INVALID_HANDLE_VALUE) { + _ = windows.ntdll.NtClose(token.handle); + } + }; + + for (tokens) |*t| { + var path: windows.UNICODE_STRING = .{ + .Buffer = nt_path.ptr, + .Length = @intCast(@sizeOf(u16) * nt_path.len), + .MaximumLength = 0, + }; + var iosb: windows.IO_STATUS_BLOCK = undefined; + switch (windows.ntdll.NtCreateNamedPipeFile( + &t.handle, + windows.GENERIC_READ, + &.{ + .Length = @sizeOf(windows.OBJECT_ATTRIBUTES), + .RootDirectory = null, + .ObjectName = &path, + .Attributes = std.os.windows.OBJ_CASE_INSENSITIVE, + .SecurityDescriptor = null, + .SecurityQualityOfService = null, + }, + &iosb, + windows.FILE_SHARE_WRITE, + windows.FILE_OPEN_IF, + 0, // no FILE_SYNCHRONOUS_IO_NONALERT, so the pipe is in overlapped mode + windows.FILE_PIPE_BYTE_STREAM_TYPE, + windows.FILE_PIPE_BYTE_STREAM_MODE, + windows.FILE_PIPE_QUEUE_OPERATION, + @intCast(num_tokens), + 0, + 0, + &((-120 * std.time.ns_per_s) / 100), + )) { + .SUCCESS => {}, + .INSUFFICIENT_RESOURCES => return error.SystemResources, + else => |e| return windows.unexpectedStatus(e), + } + } + + var done_event: windows.HANDLE = undefined; + switch (windows.ntdll.NtCreateEvent( + &done_event, + windows.EVENT_ALL_ACCESS, + null, + .Notification, + windows.FALSE, + )) { + .SUCCESS => {}, + .INSUFFICIENT_RESOURCES => return error.SystemResources, + else => |e| return windows.unexpectedStatus(e), + } + errdefer _ = windows.ntdll.NtClose(done_event); + + const thread = std.Thread.spawn(.{}, serve, .{ tokens, done_event }) catch |err| switch (err) { + error.SystemResources, + error.Unexpected, + error.OutOfMemory, + => |e| return e, + + error.ThreadQuotaExceeded, + error.LockedMemoryLimitExceeded, + => return error.SystemResources, + }; + errdefer comptime unreachable; // the thread is now running and owns `tokens` + + return .{ + .pipe_name = pipe_name, + .done_event = done_event, + .thread = thread, + }; + } + fn deinit(wp: *const Win32Pipe) void { + _ = windows.ntdll.NtSetEvent(wp.done_event, null); + wp.thread.join(); + _ = windows.ntdll.NtClose(wp.done_event); + } + pub fn formatEnvData(wp: Win32Pipe, w: *std.Io.Writer) std.Io.Writer.Error!void { + try w.writeAll(wp.pipe_name); + } + + fn serve(tokens: []Token, done_event: windows.HANDLE) void { + defer { + for (tokens) |*token| { + _ = windows.ntdll.NtClose(token.handle); + } + } + + for (tokens) |*t| serveToken(t, .connect); + + while (true) { + switch (windows.ntdll.NtWaitForSingleObject( + done_event, + windows.TRUE, + null, + )) { + windows.NTSTATUS.ABANDONED_WAIT_0 => unreachable, // not a mutex + .USER_APC => continue, + windows.NTSTATUS.WAIT_0 => break, + .TIMEOUT => unreachable, // no timeout + else => |e| std.debug.panic("unexpected NTSTATUS=0x{x} in job server", .{@intFromEnum(e)}), + } + } + } + const Action = enum { connect, read, disconnect }; + fn serveToken(token: *Token, first_action: Action) void { + action: switch (first_action) { + .connect => if (windows.DeviceIoControl(token.handle, windows.FSCTL.PIPE.LISTEN, .{ + .apc_routine = &connectCompleted, + .apc_context = token, + .io_status_block = &token.iosb, + })) |_| { + return; // The APC has been queued and will continue the loop. + } else |err| switch (err) { + error.AccessDenied => unreachable, // we created the pipe + error.UnrecognizedVolume => unreachable, // it's not a volume + error.Pending => return, + error.PipeClosing => continue :action .disconnect, + error.PipeAlreadyConnected => continue :action .read, + error.PipeAlreadyListening => unreachable, // pipe is not in nonblocking mode + error.Unexpected => @panic("unexpected error in job server"), + }, + .read => switch (windows.ntdll.NtReadFile( + token.handle, + null, + &readCompleted, + token, + &token.iosb, + &token.dummy_read_buf, + token.dummy_read_buf.len, + null, + null, + )) { + .PENDING => return, + .PIPE_BROKEN => continue :action .disconnect, + .SUCCESS => { + // The client isn't meant to write to the pipe---disconnect them as punishment. + return; // The APC has been queued and will do that for us. + }, + else => |e| std.debug.panic("unexpected NTSTATUS=0x{x} in job server", .{@intFromEnum(e)}), + }, + .disconnect => if (windows.DeviceIoControl(token.handle, windows.FSCTL.PIPE.DISCONNECT, .{ + .apc_routine = &disconnectCompleted, + .apc_context = token, + .io_status_block = &token.iosb, + })) |_| { + return; // The APC has been queued and will continue the loop. + } else |err| switch (err) { + error.AccessDenied => unreachable, // we created the pipe + error.UnrecognizedVolume => unreachable, // it's not a volume + error.Pending => return, + error.PipeClosing => unreachable, + error.PipeAlreadyConnected => unreachable, + error.PipeAlreadyListening => unreachable, + error.Unexpected => @panic("unexpected error in job server"), + }, + } + } + fn connectCompleted( + ctx: ?*anyopaque, + iosb: *windows.IO_STATUS_BLOCK, + _: windows.ULONG, + ) callconv(.winapi) void { + serveToken(@ptrCast(@alignCast(ctx)), switch (iosb.u.Status) { + .SUCCESS, .PIPE_CONNECTED => .read, + .PIPE_CLOSING => .disconnect, + else => |e| std.debug.panic("unexpected NTSTATUS=0x{x} in job server", .{@intFromEnum(e)}), + }); + } + fn readCompleted( + ctx: ?*anyopaque, + iosb: *windows.IO_STATUS_BLOCK, + _: windows.ULONG, + ) callconv(.winapi) void { + serveToken(@ptrCast(@alignCast(ctx)), switch (iosb.u.Status) { + .SUCCESS, .PIPE_BROKEN => .disconnect, + else => |e| std.debug.panic("unexpected NTSTATUS=0x{x} in job server", .{@intFromEnum(e)}), + }); + } + fn disconnectCompleted( + ctx: ?*anyopaque, + iosb: *windows.IO_STATUS_BLOCK, + _: windows.ULONG, + ) callconv(.winapi) void { + serveToken(@ptrCast(@alignCast(ctx)), switch (iosb.u.Status) { + .SUCCESS => .connect, + else => |e| std.debug.panic("unexpected NTSTATUS=0x{x} in job server", .{@intFromEnum(e)}), + }); + } +}; + +const builtin = @import("builtin"); + +const std = @import("std"); +const Allocator = std.mem.Allocator; +const assert = std.debug.assert; +const job = std.job; +const windows = std.os.windows; + +const Server = @This(); diff --git a/lib/std/std.zig b/lib/std/std.zig index 5c500d3f55..798785bed7 100644 --- a/lib/std/std.zig +++ b/lib/std/std.zig @@ -78,6 +78,7 @@ pub const hash = @import("hash.zig"); pub const hash_map = @import("hash_map.zig"); pub const heap = @import("heap.zig"); pub const http = @import("http.zig"); +pub const job = @import("job.zig"); pub const json = @import("json.zig"); pub const leb = @import("leb128.zig"); pub const log = @import("log.zig");