std.job: add Robust Jobserver implementation

This commit is contained in:
Matthew Lugg 2025-12-05 10:41:14 +00:00
parent 279e3bdde5
commit 5997ed2dd5
No known key found for this signature in database
GPG key ID: 3F5B7DCCBF4AF02E
5 changed files with 598 additions and 0 deletions

View file

@ -449,6 +449,9 @@ set(ZIG_STAGE2_SOURCES
lib/std/heap.zig lib/std/heap.zig
lib/std/heap/arena_allocator.zig lib/std/heap/arena_allocator.zig
lib/std/json.zig lib/std/json.zig
lib/std/job.zig
lib/std/job/Client.zig
lib/std/job/Server.zig
lib/std/leb128.zig lib/std/leb128.zig
lib/std/log.zig lib/std/log.zig
lib/std/macho.zig lib/std/macho.zig

116
lib/std/job.zig Normal file
View file

@ -0,0 +1,116 @@
//! This namespace provides an implementation of the Robust Jobserver protocol:
//! https://codeberg.org/mlugg/robust-jobserver/
//!
//! `Client` and `Server` currently both support the `sysvsem` and `win32pipe`
//! communication methods, meaning this implementation is usable on most POSIX
//! targets and on Windows.
pub const Client = @import("job/Client.zig");
pub const Server = @import("job/Server.zig");
pub const Method = enum { sysvsem, win32pipe };
pub const sysv_sem = struct {
pub const supported = switch (builtin.os.tag) {
.linux,
.illumos,
.haiku,
.freebsd,
.netbsd,
.openbsd,
.dragonfly,
.driverkit,
.ios,
.maccatalyst,
.macos,
.tvos,
.visionos,
.watchos,
=> true,
else => false,
};
/// `semget(IPC_PRIVATE, 1, 0o777)`
pub fn create() error{ SystemResources, Unexpected }!i32 {
const res = system.create();
switch (std.posix.errno(res)) {
.SUCCESS => return @intCast(res),
.NOMEM => return error.SystemResources,
.NOSPC => return error.SystemResources,
else => |e| return std.posix.unexpectedErrno(e),
}
}
/// `semctl(id, 0, SETVAL, n)`
pub fn setValue(id: i32, n: u32) error{Unexpected}!void {
switch (std.posix.errno(system.setValue(id, n))) {
.SUCCESS => return,
else => |e| return std.posix.unexpectedErrno(e),
}
}
/// `semop(id, &.{.{ .sem_num = 0, .sem_op = delta, .sem_flg = SEM_UNDO }})`
pub fn modify(id: i32, delta: i16) error{
InvalidSemaphore,
AccessDenied,
SystemResources,
/// A signal interrupted a blocked call to `modify`.
/// This allows the caller to implement cancelation.
Interrupted,
Unexpected,
}!void {
while (true) {
switch (std.posix.errno(system.modify(id, delta))) {
.SUCCESS => return,
.ACCES => return error.AccessDenied,
.FBIG => return error.InvalidSemaphore,
.IDRM => return error.InvalidSemaphore,
.INTR => return error.Interrupted,
.INVAL => return error.InvalidSemaphore,
.NOMEM => return error.SystemResources,
.RANGE => return error.InvalidSemaphore,
else => |e| return std.posix.unexpectedErrno(e),
}
}
}
const system = if (builtin.link_libc) struct {
fn create() c_int {
return std.c.semget(.IPC_PRIVATE, 1, 0o777);
}
fn setValue(id: i32, n: u32) c_int {
return std.c.semctl(id, 0, std.posix.SETVAL, n);
}
fn modify(id: i32, delta: i16) c_int {
var ops: [1]std.posix.sembuf = .{.{
.sem_num = 0,
.sem_op = delta,
.sem_flg = std.posix.SEM_UNDO,
}};
return std.c.semop(id, &ops, ops.len);
}
} else switch (builtin.os.tag) {
.linux => struct {
fn create() usize {
const key: std.posix.key_t = .IPC_PRIVATE;
return std.os.linux.syscall3(.semget, @intFromEnum(key), 1, 0o777);
}
fn setValue(id: i32, n: u32) usize {
return std.os.linux.syscall4(.semctl, @intCast(id), 0, std.posix.SETVAL, n);
}
fn modify(id: i32, delta: i16) usize {
var ops: [1]std.posix.sembuf = .{.{
.sem_num = 0,
.sem_op = delta,
.sem_flg = std.posix.SEM_UNDO,
}};
return std.os.linux.syscall3(.semop, @intCast(id), @intFromPtr(&ops), ops.len);
}
},
else => unreachable,
};
};
const builtin = @import("builtin");
const std = @import("std.zig");

196
lib/std/job/Client.zig Normal file
View file

@ -0,0 +1,196 @@
impl: Impl,
pub const InitError = error{
OutOfMemory,
/// There is no advertised job server.
NoServer,
/// The job server is advertising a communication method which is not known.
UnknownMethod,
/// The job server is advertising a communication method which is known but unsupported.
UnsupportedMethod,
/// A job server advertisement exists, but is malformed.
InvalidArgument,
/// The job server has shut down or is otherwise not available to connect to.
ServerFailed,
/// This process does not have permission to access the job server.
AccessDenied,
};
pub fn init(arena: Allocator, env: *const std.process.EnvMap) InitError!Client {
const env_val = env.get("ROBUST_JOBSERVER") orelse return error.NoServer;
const idx = std.mem.findScalar(u8, env_val, ':') orelse return error.InvalidArgument;
const method = std.meta.stringToEnum(job.Method, env_val[0..idx]) orelse return error.UnknownMethod;
switch (method) {
inline else => |m| {
const ImplTy = @FieldType(Impl, @tagName(m));
if (ImplTy == noreturn) return error.UnsupportedMethod;
return .{ .impl = @unionInit(
Impl,
@tagName(m),
try .init(arena, env_val[idx + 1 ..]),
) };
},
}
}
pub fn deinit(c: *Client) void {
switch (c.impl) {
inline else => |*x| x.deinit(),
}
c.* = undefined;
}
pub const AcquireError = error{
/// The job server has shut down or is otherwise not available to connect to.
ServerFailed,
/// This process does not have permission to access the job server.
AccessDenied,
/// Insufficient resources are available to acquire a token.
SystemResources,
Unexpected,
};
pub fn acquire(c: *const Client) AcquireError!Token {
return switch (c.impl) {
inline else => |*impl| impl.acquire(),
};
}
const Impl = union(job.Method) {
sysvsem: if (job.sysv_sem.supported) SysVSem else noreturn,
win32pipe: if (builtin.target.os.tag == .windows) Win32Pipe else noreturn,
};
pub const Token = union(job.Method) {
sysvsem: if (job.sysv_sem.supported) SysVSem else noreturn,
win32pipe: if (builtin.target.os.tag == .windows) windows.HANDLE else noreturn,
pub fn release(t: Token) void {
switch (t) {
.sysvsem => |sem| while (true) {
return job.sysv_sem.modify(sem.set_id, 1) catch |err| switch (err) {
error.AccessDenied, error.InvalidSemaphore => {
// The semaphore broke somehow, but that's not our problem!
// (...at least, not until we next call `acquire`.)
},
error.SystemResources => unreachable, // the undo structure was already allocated in `acquire`
error.Interrupted => continue, // releasing can't block; just retry
error.Unexpected => {}, // already warned, nothing more we can do
};
},
.win32pipe => |handle| _ = windows.ntdll.NtClose(handle),
}
}
};
const SysVSem = struct {
set_id: i32,
fn init(arena: Allocator, arg: []const u8) InitError!SysVSem {
_ = arena;
const set_id = std.fmt.parseInt(i32, arg, 10) catch return error.InvalidArgument;
return .{ .set_id = set_id };
}
fn deinit(sem: SysVSem) void {
_ = sem;
}
fn acquire(sem: SysVSem) AcquireError!Token {
while (true) {
break job.sysv_sem.modify(sem.set_id, -1) catch |err| switch (err) {
error.InvalidSemaphore => return error.ServerFailed,
error.Interrupted => continue, // TODO: support cancelation
error.AccessDenied, error.SystemResources, error.Unexpected => |e| return e,
};
}
return .{ .sysvsem = sem };
}
};
const Win32Pipe = struct {
pipe_device: windows.HANDLE,
pipe_path: [:0]const u16,
fn init(arena: Allocator, arg: []const u8) InitError!Win32Pipe {
if (arg.len == 0) return error.InvalidArgument;
if (std.mem.findAny(u8, arg, "\\/\x00") != null) return error.InvalidArgument;
const pipe_path = std.unicode.wtf8ToWtf16LeAllocZ(
arena,
try std.fmt.allocPrint(arena, "\\??\\pipe\\{s}", .{arg}),
) catch |err| switch (err) {
error.InvalidWtf8 => return error.InvalidArgument,
error.OutOfMemory => |e| return e,
};
const pipe_device = windows.OpenFile(
std.unicode.wtf8ToWtf16LeStringLiteral("\\??\\pipe\\"),
.{
.access_mask = windows.SYNCHRONIZE | windows.FILE_READ_ATTRIBUTES,
.share_access = windows.FILE_SHARE_READ | windows.FILE_SHARE_WRITE,
.creation = windows.FILE_OPEN,
.case_sensitive = false,
},
) catch |err| {
// This fixed path should always be accessible on Windows.
std.debug.panic("unexpected error opening '\\??\\pipe\\': {t}", .{err});
};
errdefer _ = windows.ntdll.NtClose(pipe_device);
return .{
.pipe_device = pipe_device,
.pipe_path = pipe_path,
};
}
fn deinit(wp: *const Win32Pipe) void {
_ = windows.ntdll.NtClose(wp.pipe_device);
}
fn acquire(wp: *const Win32Pipe) AcquireError!Token {
const pipe_basename_offset = std.unicode.wtf8ToWtf16LeStringLiteral("\\??\\pipe\\").len;
const handle = while (true) {
if (windows.OpenFile(wp.pipe_path, .{
.access_mask = windows.SYNCHRONIZE,
.creation = windows.FILE_OPEN,
.share_access = 0,
.case_sensitive = false,
})) |handle| {
return .{ .win32pipe = handle };
} else |err| switch (err) {
error.PipeBusy, error.NoDevice => {},
error.IsDir,
error.FileNotFound,
error.NameTooLong,
error.AntivirusInterference,
error.BadPathName,
=> return error.ServerFailed,
error.AccessDenied,
error.Unexpected,
=> |e| return e,
error.NotDir => unreachable, // we're not opening as a directory
error.PathAlreadyExists => unreachable, // we're not trying to create the path
error.WouldBlock => unreachable, // we're not using overlapped I/O
error.NetworkNotFound => unreachable, // we're not accessing a network device
}
const fpwfb: windows.FILE_PIPE_WAIT_FOR_BUFFER = .init(
wp.pipe_path[pipe_basename_offset..],
windows.FILE_PIPE_WAIT_FOR_BUFFER.WAIT_FOREVER,
);
windows.DeviceIoControl(
wp.pipe_device,
windows.FSCTL.PIPE.WAIT,
.{ .in = fpwfb.toBuffer() },
) catch |err| switch (err) {
error.UnrecognizedVolume => unreachable, // not a volume
error.Pending => unreachable, // not using overlapped I/O
error.PipeClosing => return error.ServerFailed,
error.PipeAlreadyConnected => unreachable,
error.PipeAlreadyListening => unreachable,
error.Unexpected, error.AccessDenied => |e| return e,
};
continue;
};
return .{ .win32pipe = handle };
}
};
const builtin = @import("builtin");
const std = @import("../std.zig");
const Allocator = std.mem.Allocator;
const job = std.job;
const windows = std.os.windows;
const Client = @This();

282
lib/std/job/Server.zig Normal file
View file

@ -0,0 +1,282 @@
/// `null` means we are inheriting a jobserver instance from a parent process.
impl: ?Impl,
pub const InitError = error{ OutOfMemory, SystemResources, Unexpected };
pub fn init(arena: Allocator, job_limit: u32, env: *std.process.EnvMap) InitError!Server {
assert(job_limit > 0);
if (env.get("ROBUST_JOBSERVER") != null) {
return .{ .impl = null };
}
const method: job.Method = switch (builtin.target.os.tag) {
.windows => .win32pipe,
else => .sysvsem,
};
const impl: @FieldType(Impl, @tagName(method)) = try .init(arena, job_limit);
const env_val = try std.fmt.allocPrint(arena, "{t}:{f}", .{ method, std.fmt.alt(impl, .formatEnvData) });
try env.put("ROBUST_JOBSERVER", env_val);
return .{ .impl = @unionInit(Impl, @tagName(method), impl) };
}
pub fn deinit(s: *Server) void {
if (s.impl) |*impl| {
switch (impl.*) {
inline else => |*x| x.deinit(),
}
}
s.* = undefined;
}
const Impl = union(job.Method) {
sysvsem: if (job.sysv_sem.supported) SysVSem else noreturn,
win32pipe: if (builtin.target.os.tag == .windows) Win32Pipe else noreturn,
};
const SysVSem = struct {
set_id: i32,
fn init(arena: Allocator, num_tokens: u32) InitError!SysVSem {
_ = arena;
const id = try job.sysv_sem.create();
try job.sysv_sem.setValue(id, num_tokens);
return .{ .set_id = id };
}
fn deinit(sem: SysVSem) void {
_ = sem;
}
pub fn formatEnvData(sem: SysVSem, w: *std.Io.Writer) std.Io.Writer.Error!void {
try w.print("{d}", .{sem.set_id});
}
};
const Win32Pipe = struct {
pipe_name: []const u8,
done_event: windows.HANDLE,
thread: std.Thread,
const Token = struct {
handle: windows.HANDLE,
iosb: windows.IO_STATUS_BLOCK,
dummy_read_buf: [1]u8,
};
var pipe_name_counter: std.atomic.Value(u32) = .init(0);
fn init(arena: Allocator, num_tokens: u32) InitError!Win32Pipe {
const pipe_name = try std.fmt.allocPrint(arena, "zig-jobserver-{d}-{x}", .{
windows.GetCurrentProcessId(),
std.crypto.random.int(u64),
});
const nt_path = std.unicode.wtf8ToWtf16LeAllocZ(
arena,
try std.fmt.allocPrint(arena, "\\??\\pipe\\{s}", .{pipe_name}),
) catch |err| switch (err) {
error.InvalidWtf8 => unreachable,
error.OutOfMemory => |e| return e,
};
const tokens = try arena.alloc(Token, num_tokens);
@memset(tokens, .{
.handle = windows.INVALID_HANDLE_VALUE,
.iosb = undefined,
.dummy_read_buf = undefined,
});
errdefer for (tokens) |*token| {
if (token.handle != windows.INVALID_HANDLE_VALUE) {
_ = windows.ntdll.NtClose(token.handle);
}
};
for (tokens) |*t| {
var path: windows.UNICODE_STRING = .{
.Buffer = nt_path.ptr,
.Length = @intCast(@sizeOf(u16) * nt_path.len),
.MaximumLength = 0,
};
var iosb: windows.IO_STATUS_BLOCK = undefined;
switch (windows.ntdll.NtCreateNamedPipeFile(
&t.handle,
windows.GENERIC_READ,
&.{
.Length = @sizeOf(windows.OBJECT_ATTRIBUTES),
.RootDirectory = null,
.ObjectName = &path,
.Attributes = std.os.windows.OBJ_CASE_INSENSITIVE,
.SecurityDescriptor = null,
.SecurityQualityOfService = null,
},
&iosb,
windows.FILE_SHARE_WRITE,
windows.FILE_OPEN_IF,
0, // no FILE_SYNCHRONOUS_IO_NONALERT, so the pipe is in overlapped mode
windows.FILE_PIPE_BYTE_STREAM_TYPE,
windows.FILE_PIPE_BYTE_STREAM_MODE,
windows.FILE_PIPE_QUEUE_OPERATION,
@intCast(num_tokens),
0,
0,
&((-120 * std.time.ns_per_s) / 100),
)) {
.SUCCESS => {},
.INSUFFICIENT_RESOURCES => return error.SystemResources,
else => |e| return windows.unexpectedStatus(e),
}
}
var done_event: windows.HANDLE = undefined;
switch (windows.ntdll.NtCreateEvent(
&done_event,
windows.EVENT_ALL_ACCESS,
null,
.Notification,
windows.FALSE,
)) {
.SUCCESS => {},
.INSUFFICIENT_RESOURCES => return error.SystemResources,
else => |e| return windows.unexpectedStatus(e),
}
errdefer _ = windows.ntdll.NtClose(done_event);
const thread = std.Thread.spawn(.{}, serve, .{ tokens, done_event }) catch |err| switch (err) {
error.SystemResources,
error.Unexpected,
error.OutOfMemory,
=> |e| return e,
error.ThreadQuotaExceeded,
error.LockedMemoryLimitExceeded,
=> return error.SystemResources,
};
errdefer comptime unreachable; // the thread is now running and owns `tokens`
return .{
.pipe_name = pipe_name,
.done_event = done_event,
.thread = thread,
};
}
fn deinit(wp: *const Win32Pipe) void {
_ = windows.ntdll.NtSetEvent(wp.done_event, null);
wp.thread.join();
_ = windows.ntdll.NtClose(wp.done_event);
}
pub fn formatEnvData(wp: Win32Pipe, w: *std.Io.Writer) std.Io.Writer.Error!void {
try w.writeAll(wp.pipe_name);
}
fn serve(tokens: []Token, done_event: windows.HANDLE) void {
defer {
for (tokens) |*token| {
_ = windows.ntdll.NtClose(token.handle);
}
}
for (tokens) |*t| serveToken(t, .connect);
while (true) {
switch (windows.ntdll.NtWaitForSingleObject(
done_event,
windows.TRUE,
null,
)) {
windows.NTSTATUS.ABANDONED_WAIT_0 => unreachable, // not a mutex
.USER_APC => continue,
windows.NTSTATUS.WAIT_0 => break,
.TIMEOUT => unreachable, // no timeout
else => |e| std.debug.panic("unexpected NTSTATUS=0x{x} in job server", .{@intFromEnum(e)}),
}
}
}
const Action = enum { connect, read, disconnect };
fn serveToken(token: *Token, first_action: Action) void {
action: switch (first_action) {
.connect => if (windows.DeviceIoControl(token.handle, windows.FSCTL.PIPE.LISTEN, .{
.apc_routine = &connectCompleted,
.apc_context = token,
.io_status_block = &token.iosb,
})) |_| {
return; // The APC has been queued and will continue the loop.
} else |err| switch (err) {
error.AccessDenied => unreachable, // we created the pipe
error.UnrecognizedVolume => unreachable, // it's not a volume
error.Pending => return,
error.PipeClosing => continue :action .disconnect,
error.PipeAlreadyConnected => continue :action .read,
error.PipeAlreadyListening => unreachable, // pipe is not in nonblocking mode
error.Unexpected => @panic("unexpected error in job server"),
},
.read => switch (windows.ntdll.NtReadFile(
token.handle,
null,
&readCompleted,
token,
&token.iosb,
&token.dummy_read_buf,
token.dummy_read_buf.len,
null,
null,
)) {
.PENDING => return,
.PIPE_BROKEN => continue :action .disconnect,
.SUCCESS => {
// The client isn't meant to write to the pipe---disconnect them as punishment.
return; // The APC has been queued and will do that for us.
},
else => |e| std.debug.panic("unexpected NTSTATUS=0x{x} in job server", .{@intFromEnum(e)}),
},
.disconnect => if (windows.DeviceIoControl(token.handle, windows.FSCTL.PIPE.DISCONNECT, .{
.apc_routine = &disconnectCompleted,
.apc_context = token,
.io_status_block = &token.iosb,
})) |_| {
return; // The APC has been queued and will continue the loop.
} else |err| switch (err) {
error.AccessDenied => unreachable, // we created the pipe
error.UnrecognizedVolume => unreachable, // it's not a volume
error.Pending => return,
error.PipeClosing => unreachable,
error.PipeAlreadyConnected => unreachable,
error.PipeAlreadyListening => unreachable,
error.Unexpected => @panic("unexpected error in job server"),
},
}
}
fn connectCompleted(
ctx: ?*anyopaque,
iosb: *windows.IO_STATUS_BLOCK,
_: windows.ULONG,
) callconv(.winapi) void {
serveToken(@ptrCast(@alignCast(ctx)), switch (iosb.u.Status) {
.SUCCESS, .PIPE_CONNECTED => .read,
.PIPE_CLOSING => .disconnect,
else => |e| std.debug.panic("unexpected NTSTATUS=0x{x} in job server", .{@intFromEnum(e)}),
});
}
fn readCompleted(
ctx: ?*anyopaque,
iosb: *windows.IO_STATUS_BLOCK,
_: windows.ULONG,
) callconv(.winapi) void {
serveToken(@ptrCast(@alignCast(ctx)), switch (iosb.u.Status) {
.SUCCESS, .PIPE_BROKEN => .disconnect,
else => |e| std.debug.panic("unexpected NTSTATUS=0x{x} in job server", .{@intFromEnum(e)}),
});
}
fn disconnectCompleted(
ctx: ?*anyopaque,
iosb: *windows.IO_STATUS_BLOCK,
_: windows.ULONG,
) callconv(.winapi) void {
serveToken(@ptrCast(@alignCast(ctx)), switch (iosb.u.Status) {
.SUCCESS => .connect,
else => |e| std.debug.panic("unexpected NTSTATUS=0x{x} in job server", .{@intFromEnum(e)}),
});
}
};
const builtin = @import("builtin");
const std = @import("std");
const Allocator = std.mem.Allocator;
const assert = std.debug.assert;
const job = std.job;
const windows = std.os.windows;
const Server = @This();

View file

@ -78,6 +78,7 @@ pub const hash = @import("hash.zig");
pub const hash_map = @import("hash_map.zig"); pub const hash_map = @import("hash_map.zig");
pub const heap = @import("heap.zig"); pub const heap = @import("heap.zig");
pub const http = @import("http.zig"); pub const http = @import("http.zig");
pub const job = @import("job.zig");
pub const json = @import("json.zig"); pub const json = @import("json.zig");
pub const leb = @import("leb128.zig"); pub const leb = @import("leb128.zig");
pub const log = @import("log.zig"); pub const log = @import("log.zig");