implement --watch for kqueue

it doesn't detect and remove no longer watched things yet

it also isn't aware of any file names reported by kqueue. I'm unsure if
that functionality exists.
This commit is contained in:
Andrew Kelley 2024-10-24 16:24:56 -07:00
parent 56996a2809
commit 7a46ba73ce
2 changed files with 219 additions and 50 deletions

View file

@ -447,10 +447,7 @@ pub fn main() !void {
if (!watch) return cleanExit(); if (!watch) return cleanExit();
switch (builtin.os.tag) { if (!Watch.have_impl) fatal("--watch not yet implemented for {s}", .{@tagName(builtin.os.tag)});
.linux, .windows => {},
else => fatal("--watch not yet implemented for {s}", .{@tagName(builtin.os.tag)}),
}
try w.update(gpa, run.step_stack.keys()); try w.update(gpa, run.step_stack.keys());

View file

@ -10,6 +10,8 @@ dir_table: DirTable,
os: Os, os: Os,
generation: Generation, generation: Generation,
pub const have_impl = Os != void;
/// Key is the directory to watch which contains one or more files we are /// Key is the directory to watch which contains one or more files we are
/// interested in noticing changes to. /// interested in noticing changes to.
/// ///
@ -236,6 +238,16 @@ const Os = switch (builtin.os.tag) {
w.generation +%= 1; w.generation +%= 1;
} }
} }
fn wait(w: *Watch, gpa: Allocator, timeout: Timeout) !WaitResult {
const events_len = try std.posix.poll(&w.os.poll_fds, timeout.to_i32_ms());
return if (events_len == 0)
.timeout
else if (try Os.markDirtySteps(w, gpa))
.dirty
else
.clean;
}
}, },
.windows => struct { .windows => struct {
const windows = std.os.windows; const windows = std.os.windows;
@ -509,6 +521,182 @@ const Os = switch (builtin.os.tag) {
w.generation +%= 1; w.generation +%= 1;
} }
} }
fn wait(w: *Watch, gpa: Allocator, timeout: Timeout) !WaitResult {
var bytes_transferred: std.os.windows.DWORD = undefined;
var key: usize = undefined;
var overlapped_ptr: ?*std.os.windows.OVERLAPPED = undefined;
return while (true) switch (std.os.windows.GetQueuedCompletionStatus(
w.os.io_cp.?,
&bytes_transferred,
&key,
&overlapped_ptr,
@bitCast(timeout.to_i32_ms()),
)) {
.Normal => {
if (bytes_transferred == 0)
break error.Unexpected;
// This 'orelse' detects a race condition that happens when we receive a
// completion notification for a directory that no longer exists in our list.
const dir = w.os.dir_list.get(key) orelse break .clean;
break if (try Os.markDirtySteps(w, gpa, dir))
.dirty
else
.clean;
},
.Timeout => break .timeout,
// This status is issued because CancelIo was called, skip and try again.
.Cancelled => continue,
else => break error.Unexpected,
};
}
},
.dragonfly, .freebsd, .netbsd, .openbsd, .ios, .macos, .tvos, .visionos, .watchos, .haiku => struct {
const posix = std.posix;
kq_fd: i32,
/// Indexes correspond 1:1 with `dir_table`.
reaction_sets: std.ArrayListUnmanaged(ReactionSet),
const dir_open_flags: posix.O = f: {
var f: posix.O = .{
.ACCMODE = .RDONLY,
.NOFOLLOW = false,
.DIRECTORY = true,
.CLOEXEC = true,
};
if (@hasField(posix.O, "EVTONLY")) f.EVTONLY = true;
if (@hasField(posix.O, "PATH")) f.PATH = true;
break :f f;
};
fn update(w: *Watch, gpa: Allocator, steps: []const *Step) !void {
for (steps) |step| {
for (step.inputs.table.keys(), step.inputs.table.values()) |path, *files| {
const reaction_set = rs: {
const gop = try w.dir_table.getOrPut(gpa, path);
if (!gop.found_existing) {
const dir_fd = if (path.sub_path.len == 0)
path.root_dir.handle.fd
else
posix.openat(path.root_dir.handle.fd, path.sub_path, dir_open_flags, 0) catch |err| {
fatal("failed to open directory {}: {s}", .{ path, @errorName(err) });
};
const EV = std.c.EV;
const NOTE = std.c.NOTE;
var changes = [1]posix.Kevent{.{
.ident = @bitCast(@as(isize, dir_fd)),
.filter = std.c.EVFILT.VNODE,
.flags = EV.ADD | EV.ENABLE | EV.CLEAR,
.fflags = NOTE.DELETE | NOTE.WRITE | NOTE.RENAME | NOTE.REVOKE,
.data = 0,
.udata = gop.index,
}};
_ = try posix.kevent(w.os.kq_fd, &changes, &.{}, null);
assert(w.os.reaction_sets.items.len == gop.index);
const reaction_set = try w.os.reaction_sets.addOne(gpa);
reaction_set.* = .{};
break :rs reaction_set;
}
break :rs &w.os.reaction_sets.items[gop.index];
};
for (files.items) |basename| {
const gop = try reaction_set.getOrPut(gpa, basename);
if (!gop.found_existing) gop.value_ptr.* = .{};
try gop.value_ptr.put(gpa, step, w.generation);
}
}
}
{
// Remove marks for files that are no longer inputs.
//var i: usize = 0;
//while (i < w.os.handle_table.entries.len) {
// {
// const reaction_set = &w.os.handle_table.values()[i];
// var step_set_i: usize = 0;
// while (step_set_i < reaction_set.entries.len) {
// const step_set = &reaction_set.values()[step_set_i];
// var dirent_i: usize = 0;
// while (dirent_i < step_set.entries.len) {
// const generations = step_set.values();
// if (generations[dirent_i] == w.generation) {
// dirent_i += 1;
// continue;
// }
// step_set.swapRemoveAt(dirent_i);
// }
// if (step_set.entries.len > 0) {
// step_set_i += 1;
// continue;
// }
// reaction_set.swapRemoveAt(step_set_i);
// }
// if (reaction_set.entries.len > 0) {
// i += 1;
// continue;
// }
// }
// const path = w.dir_table.keys()[i];
// posix.fanotify_mark(fan_fd, .{
// .REMOVE = true,
// .ONLYDIR = true,
// }, fan_mask, path.root_dir.handle.fd, path.subPathOrDot()) catch |err| switch (err) {
// error.FileNotFound => {}, // Expected, harmless.
// else => |e| std.log.warn("unable to unwatch '{}': {s}", .{ path, @errorName(e) }),
// };
// w.dir_table.swapRemoveAt(i);
// w.os.handle_table.swapRemoveAt(i);
//}
w.generation +%= 1;
}
}
fn wait(w: *Watch, gpa: Allocator, timeout: Timeout) !WaitResult {
var timespec_buffer: posix.timespec = undefined;
var event_buffer: [100]posix.Kevent = undefined;
var n = try posix.kevent(w.os.kq_fd, &.{}, &event_buffer, timeout.toTimespec(&timespec_buffer));
if (n == 0) return .timeout;
const reaction_sets = w.os.reaction_sets.items;
var any_dirty = markDirtySteps(gpa, reaction_sets, event_buffer[0..n], false);
timespec_buffer = .{ .sec = 0, .nsec = 0 };
while (n == event_buffer.len) {
n = try posix.kevent(w.os.kq_fd, &.{}, &event_buffer, &timespec_buffer);
if (n == 0) break;
any_dirty = markDirtySteps(gpa, reaction_sets, event_buffer[0..n], any_dirty);
}
return if (any_dirty) .dirty else .clean;
}
fn markDirtySteps(
gpa: Allocator,
reaction_sets: []ReactionSet,
events: []const std.c.Kevent,
start_any_dirty: bool,
) bool {
var any_dirty = start_any_dirty;
for (events) |event| {
const index: usize = @intCast(event.udata);
const reaction_set = &reaction_sets[index];
// If we knew the basename of the changed file, here we would
// mark only the step set dirty, and possibly the glob set:
//if (reaction_set.getPtr(".")) |glob_set|
// any_dirty = markStepSetDirty(gpa, glob_set, any_dirty);
//if (reaction_set.getPtr(file_name)) |step_set|
// any_dirty = markStepSetDirty(gpa, step_set, any_dirty);
// However we don't know the file name so just mark all the
// sets dirty for this directory.
for (reaction_set.values()) |*step_set| {
any_dirty = markStepSetDirty(gpa, step_set, any_dirty);
}
}
return any_dirty;
}
}, },
else => void, else => void,
}; };
@ -560,6 +748,20 @@ pub fn init() !Watch {
.generation = 0, .generation = 0,
}; };
}, },
.dragonfly, .freebsd, .netbsd, .openbsd, .ios, .macos, .tvos, .visionos, .watchos => {
const posix = std.posix;
const kq_fd = try posix.kqueue();
errdefer posix.close(kq_fd);
return .{
.dir_table = .{},
.os = .{
.kq_fd = kq_fd,
.reaction_sets = .{},
},
.generation = 0,
};
},
else => @panic("unimplemented"), else => @panic("unimplemented"),
} }
} }
@ -609,10 +811,7 @@ fn markStepSetDirty(gpa: Allocator, step_set: *StepSet, any_dirty: bool) bool {
} }
pub fn update(w: *Watch, gpa: Allocator, steps: []const *Step) !void { pub fn update(w: *Watch, gpa: Allocator, steps: []const *Step) !void {
switch (builtin.os.tag) { return Os.update(w, gpa, steps);
.linux, .windows => return Os.update(w, gpa, steps),
else => @compileError("unimplemented"),
}
} }
pub const Timeout = union(enum) { pub const Timeout = union(enum) {
@ -625,6 +824,20 @@ pub const Timeout = union(enum) {
.ms => |ms| ms, .ms => |ms| ms,
}; };
} }
pub fn toTimespec(t: Timeout, buf: *std.posix.timespec) ?*std.posix.timespec {
return switch (t) {
.none => null,
.ms => |ms_u16| {
const ms: isize = ms_u16;
buf.* = .{
.sec = @divTrunc(ms, std.time.ms_per_s),
.nsec = @rem(ms, std.time.ms_per_s) * std.time.ns_per_ms,
};
return buf;
},
};
}
}; };
pub const WaitResult = enum { pub const WaitResult = enum {
@ -638,46 +851,5 @@ pub const WaitResult = enum {
}; };
pub fn wait(w: *Watch, gpa: Allocator, timeout: Timeout) !WaitResult { pub fn wait(w: *Watch, gpa: Allocator, timeout: Timeout) !WaitResult {
switch (builtin.os.tag) { return Os.wait(w, gpa, timeout);
.linux => {
const events_len = try std.posix.poll(&w.os.poll_fds, timeout.to_i32_ms());
return if (events_len == 0)
.timeout
else if (try Os.markDirtySteps(w, gpa))
.dirty
else
.clean;
},
.windows => {
var bytes_transferred: std.os.windows.DWORD = undefined;
var key: usize = undefined;
var overlapped_ptr: ?*std.os.windows.OVERLAPPED = undefined;
return while (true) switch (std.os.windows.GetQueuedCompletionStatus(
w.os.io_cp.?,
&bytes_transferred,
&key,
&overlapped_ptr,
@bitCast(timeout.to_i32_ms()),
)) {
.Normal => {
if (bytes_transferred == 0)
break error.Unexpected;
// This 'orelse' detects a race condition that happens when we receive a
// completion notification for a directory that no longer exists in our list.
const dir = w.os.dir_list.get(key) orelse break .clean;
break if (try Os.markDirtySteps(w, gpa, dir))
.dirty
else
.clean;
},
.Timeout => break .timeout,
// This status is issued because CancelIo was called, skip and try again.
.Cancelled => continue,
else => break error.Unexpected,
};
},
else => @compileError("unimplemented"),
}
} }