Implement some more IoUring operations

Add timeout update, cancel_fd, epoll_wait, files_update, open, open_direct, f/madvice

don't implement f/madvice64 for now I doubt it is used by a lot of people in practice

Implement BufferGroup read_multishot

Try to make definition of function align as logically as posible with
liburing to make keeping our implementation in sync easy

TODO: add M/Fadvice enum flags
TODO: understand buffer_selection and how its different from BufferGroup

Signed-off-by: Bernard Assan <mega.alpha100@gmail.com>
This commit is contained in:
Bernard Assan 2025-10-14 23:35:36 +00:00
parent f68bcb6b5f
commit ee71dcca10
No known key found for this signature in database
GPG key ID: C2A2C53574321095
2 changed files with 523 additions and 336 deletions

View file

@ -8370,6 +8370,13 @@ pub const MADV = struct {
pub const SOFT_OFFLINE = 101;
};
pub const Madvice = enum(u32) {
_, // TODO: add options
};
pub const Fadvice = enum(u32) {
_, // TODO: add options
};
pub const POSIX_FADV = switch (native_arch) {
.s390x => if (@typeInfo(usize).int.bits == 64) struct {
pub const NORMAL = 0;

View file

@ -374,39 +374,6 @@ pub fn cq_advance(self: *IoUring, count: u32) void {
}
}
/// Queues (but does not submit) an SQE to perform an `fsync(2)`.
/// Returns a pointer to the SQE so that you can further modify the SQE for
/// advanced use cases.
/// For example, for `fdatasync()` you can set `IORING_FSYNC_DATASYNC` in the
/// SQE's `rw_flags`.
/// N.B. While SQEs are initiated in the order in which they appear in the
/// submission queue, operations execute in parallel and completions are
/// unordered. Therefore, an application that submits a write followed by an
/// fsync in the submission queue cannot expect the fsync to apply to the write,
/// since the fsync may complete before the write is issued to the disk.
/// You should preferably use `link_with_next_sqe()` on a write's SQE to link
/// it with an fsync, or else insert a full write barrier using
/// `drain_previous_sqes()` when queueing an fsync.
pub fn fsync(self: *IoUring, user_data: u64, fd: posix.fd_t, flags: uflags.Fsync) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_fsync(fd, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a no-op.
/// Returns a pointer to the SQE so that you can further modify the SQE for
/// advanced use cases.
/// A no-op is more useful than may appear at first glance.
/// For example, you could call `drain_previous_sqes()` on the returned SQE, to
/// use the no-op to know when the ring is idle before acting on a kill signal.
pub fn nop(self: *IoUring, user_data: u64) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_nop();
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `splice(2)`
/// Either `fd_in` or `fd_out` must be a pipe.
/// If `fd_in` refers to a pipe, `off_in` is ignored and must be set to
@ -631,6 +598,142 @@ pub fn poll_add(
return sqe;
}
/// Queues (but does not submit) an SQE to perform a multishot `poll(2)`.
/// Returns a pointer to the SQE.
pub fn poll_multishot(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
poll_mask: linux.Epoll,
) !*Sqe {
const sqe = try self.poll_add(user_data, fd, poll_mask);
sqe.len = @bitCast(uflags.Poll{ .add_multi = true });
return sqe;
}
/// Queues (but does not submit) an SQE to remove an existing poll operation.
/// Returns a pointer to the SQE.
pub fn poll_remove(
self: *IoUring,
user_data: u64,
target_user_data: u64,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_poll_remove(target_user_data);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to update the user data of an existing
/// poll operation. Returns a pointer to the SQE.
pub fn poll_update(
self: *IoUring,
user_data: u64,
old_user_data: u64,
new_user_data: u64,
poll_mask: linux.Epoll,
flags: uflags.Poll,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_poll_update(old_user_data, new_user_data, poll_mask, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an `fsync(2)`.
/// Returns a pointer to the SQE so that you can further modify the SQE for
/// advanced use cases.
/// For example, for `fdatasync()` you can set `IORING_FSYNC_DATASYNC` in the
/// SQE's `rw_flags`.
/// N.B. While SQEs are initiated in the order in which they appear in the
/// submission queue, operations execute in parallel and completions are
/// unordered. Therefore, an application that submits a write followed by an
/// fsync in the submission queue cannot expect the fsync to apply to the write,
/// since the fsync may complete before the write is issued to the disk.
/// You should preferably use `link_with_next_sqe()` on a write's SQE to link
/// it with an fsync, or else insert a full write barrier using
/// `drain_previous_sqes()` when queueing an fsync.
pub fn fsync(self: *IoUring, user_data: u64, fd: posix.fd_t, flags: uflags.Fsync) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_fsync(fd, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a no-op.
/// Returns a pointer to the SQE so that you can further modify the SQE for
/// advanced use cases.
/// A no-op is more useful than may appear at first glance.
/// For example, you could call `drain_previous_sqes()` on the returned SQE, to
/// use the no-op to know when the ring is idle before acting on a kill signal.
pub fn nop(self: *IoUring, user_data: u64) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_nop();
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to register a timeout operation.
/// Returns a pointer to the SQE.
///
/// The timeout will complete when either the timeout expires, or after the
/// specified number of events complete (if `count` is greater than `0`).
///
/// `flags` may be `0` for a relative timeout, or `IORING_TIMEOUT_ABS` for an
/// absolute timeout.
///
/// The completion event result will be `-ETIME` if the timeout completed
/// through expiration, `0` if the timeout completed after the specified number
/// of events, or `-ECANCELED` if the timeout was removed before it expired.
///
/// io_uring timeouts use the `CLOCK.MONOTONIC` clock source.
pub fn timeout(
self: *IoUring,
user_data: u64,
ts: *const linux.kernel_timespec,
count: u32,
flags: uflags.Timeout,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_timeout(ts, count, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to remove an existing timeout operation.
/// Returns a pointer to the SQE.
///
/// The timeout is identified by its `user_data`.
///
/// The completion event result will be `0` if the timeout was found and
/// cancelled successfully else:
/// `-EBUSY` if the timeout was found but expiration was already in progress, or
/// `-ENOENT` if the timeout was not found.
pub fn timeout_remove(
self: *IoUring,
user_data: u64,
timeout_user_data: u64,
flags: uflags.Timeout,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_timeout_remove(timeout_user_data, flags);
sqe.user_data = user_data;
return sqe;
}
pub fn timeout_update(
self: *IoUring,
user_data: u64,
timeout_user_data: u64,
ts: *const linux.kernel_timespec,
flags: uflags.Timeout,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_timeout_update(timeout_user_data, ts, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an `accept4(2)` on a socket.
/// Returns a pointer to the SQE.
/// Available since 5.5
@ -648,28 +751,6 @@ pub fn accept(
return sqe;
}
/// Queues an multishot accept on a socket.
///
/// Multishot variant allows an application to issue a single accept request,
/// which will repeatedly trigger a CQE when a connection request comes in.
/// While IORING_CQE_F_MORE flag is set in CQE flags accept will generate
/// further CQEs.
///
/// Available since 5.19
pub fn accept_multishot(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
addr: ?*posix.sockaddr,
addrlen: ?*posix.socklen_t,
flags: linux.Sock,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_multishot_accept(fd, addr, addrlen, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues an accept using direct (registered) file descriptors.
///
/// To use an accept direct variant, the application must first have registered
@ -695,6 +776,28 @@ pub fn accept_direct(
return sqe;
}
/// Queues an multishot accept on a socket.
///
/// Multishot variant allows an application to issue a single accept request,
/// which will repeatedly trigger a CQE when a connection request comes in.
/// While IORING_CQE_F_MORE flag is set in CQE flags accept will generate
/// further CQEs.
///
/// Available since 5.19
pub fn accept_multishot(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
addr: ?*posix.sockaddr,
addrlen: ?*posix.socklen_t,
flags: linux.Sock,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_multishot_accept(fd, addr, addrlen, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues an multishot accept using direct (registered) file descriptors.
/// Available since 5.19
pub fn accept_multishot_direct(
@ -711,6 +814,66 @@ pub fn accept_multishot_direct(
return sqe;
}
/// Queues (but does not submit) an SQE to remove an existing operation.
/// Returns a pointer to the SQE.
///
/// The operation is identified by its `user_data`.
///
/// The completion event result will be `0` if the operation was found and
/// cancelled successfully else either of:
/// `-EALREADY` if the operation was found but was already in progress
/// `-ENOENT` if the operation was not found.
pub fn cancel(
self: *IoUring,
user_data: u64,
cancel_user_data: u64,
flags: uflags.AsyncCancel,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_cancel(cancel_user_data, flags);
sqe.user_data = user_data;
return sqe;
}
pub fn cancel_fd(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
flags: uflags.AsyncCancel,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_cancel_fd(fd, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to add a link timeout operation.
/// Returns a pointer to the SQE.
///
/// You need to set IOSQE_IO_LINK to flags of the target operation and then
/// call this method right after the target operation.
/// See https://lwn.net/Articles/803932/ for detail.
///
/// If the dependent request finishes before the linked timeout, the timeout
/// is canceled. If the timeout finishes before the dependent request, the
/// dependent request will be canceled.
///
/// The completion event result of the link_timeout will be either of:
/// `-ETIME` if the timeout finishes before the dependent request (in this case,
/// the completion event result of the dependent request will be `-ECANCELED`)
/// `-EALREADY` if the dependent request finishes before the linked timeout.
pub fn link_timeout(
self: *IoUring,
user_data: u64,
ts: *const linux.kernel_timespec,
flags: uflags.Timeout,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_link_timeout(ts, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queue (but does not submit) an SQE to perform a `connect(2)` on a socket.
/// Returns a pointer to the SQE.
pub fn connect(
@ -726,6 +889,59 @@ pub fn connect(
return sqe;
}
/// Queues (but does not submit) an SQE to perform an `bind(2)` on a socket.
/// Returns a pointer to the SQE.
/// Available since 6.11
pub fn bind(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
addr: *const posix.sockaddr,
addrlen: posix.socklen_t,
// liburing doesn't have this flag, hence 0 should be passed
// TODO: consider removing this and all flags like this
flags: u32,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_bind(fd, addr, addrlen, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an `listen(2)` on a socket.
/// Returns a pointer to the SQE.
/// Available since 6.11
pub fn listen(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
backlog: usize,
// liburing doesn't have this flag, hence 0 should be passed
// TODO: consider removing this and all flags like this
flags: u32,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_listen(fd, backlog, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `epoll_wait(2)`.
/// Returns a pointer to the SQE.
pub fn epoll_wait(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
events: ?*linux.epoll_event,
max_events: u32,
flags: linux.Epoll,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_epoll_wait(fd, events, max_events, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `epoll_ctl(2)`.
/// Returns a pointer to the SQE.
pub fn epoll_ctl(
@ -742,6 +958,182 @@ pub fn epoll_ctl(
return sqe;
}
pub fn files_update(
self: *IoUring,
user_data: u64,
fds: []const linux.fd_t,
offset: u32,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_files_update(fds, offset);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an `fallocate(2)`.
/// Returns a pointer to the SQE.
pub fn fallocate(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
mode: i32,
offset: u64,
len: u64,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_fallocate(fd, mode, offset, len);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an `openat(2)`.
/// Returns a pointer to the SQE.
/// Available since 5.6.
pub fn openat(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
path: [*:0]const u8,
flags: linux.O,
mode: posix.mode_t,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_openat(fd, path, flags, mode);
sqe.user_data = user_data;
return sqe;
}
/// Queues an openat using direct (registered) file descriptors.
///
/// To use an accept direct variant, the application must first have registered
/// a file table (with register_files()). An unused table index will be
/// dynamically chosen and returned in the CQE res field.
///
/// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE
/// flags member, and setting the SQE fd field to the direct descriptor value
/// rather than the regular file descriptor.
///
/// Available since 5.15
pub fn openat_direct(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
path: [*:0]const u8,
flags: linux.O,
mode: posix.mode_t,
file_index: u32,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_openat_direct(fd, path, flags, mode, file_index);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an `open(2)`.
/// Returns a pointer to the SQE.
pub fn open(
self: *IoUring,
user_data: u64,
path: [*:0]const u8,
flags: linux.O,
mode: posix.mode_t,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_openat(linux.At.fdcwd, path, flags, mode);
sqe.user_data = user_data;
return sqe;
}
/// Queues an open using direct (registered) file descriptors.
///
/// To use an accept direct variant, the application must first have registered
/// a file table (with register_files()). An unused table index will be
/// dynamically chosen and returned in the CQE res field.
///
/// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE
/// flags member, and setting the SQE fd field to the direct descriptor value
/// rather than the regular file descriptor.
pub fn open_direct(
self: *IoUring,
user_data: u64,
path: [*:0]const u8,
flags: linux.O,
mode: posix.mode_t,
file_index: u32,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_openat_direct(linux.At.fdcwd, path, flags, mode, file_index);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `close(2)`.
/// Returns a pointer to the SQE.
/// Available since 5.6.
pub fn close(self: *IoUring, user_data: u64, fd: posix.fd_t) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_close(fd);
sqe.user_data = user_data;
return sqe;
}
/// Queues close of registered file descriptor.
/// Available since 5.15
pub fn close_direct(self: *IoUring, user_data: u64, file_index: u32) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_close_direct(file_index);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an `statx(2)`.
/// Returns a pointer to the SQE.
pub fn statx(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
path: [:0]const u8,
flags: linux.At,
mask: linux.Statx.Mask,
buf: *linux.Statx,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_statx(fd, path, flags, mask, buf);
sqe.user_data = user_data;
return sqe;
}
// COMMIT: don't implement f/madvice64 for now I dought it is used by a lot of people in practice
/// Queues (but does not submit) an SQE to perform an `posix_fadvise(2)`.
/// Returns a pointer to the SQE.
pub fn fadvice(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
offset: u64,
len: u32,
advice: linux.Fadvice,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_fadvice(fd, offset, len, advice);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an `madvise(2)`.
/// Returns a pointer to the SQE.
pub fn madvice(
self: *IoUring,
user_data: u64,
memory: []u8,
advice: linux.Fadvice,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_madvice(memory, advice);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `recv(2)`.
/// Returns a pointer to the SQE.
/// Available since 5.6
@ -845,226 +1237,6 @@ pub fn sendmsg_zc(
return sqe;
}
/// Queues (but does not submit) an SQE to perform an `openat(2)`.
/// Returns a pointer to the SQE.
/// Available since 5.6.
pub fn openat(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
path: [*:0]const u8,
flags: linux.O,
mode: posix.mode_t,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_openat(fd, path, flags, mode);
sqe.user_data = user_data;
return sqe;
}
/// Queues an openat using direct (registered) file descriptors.
///
/// To use an accept direct variant, the application must first have registered
/// a file table (with register_files). An unused table index will be
/// dynamically chosen and returned in the CQE res field.
///
/// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE
/// flags member, and setting the SQE fd field to the direct descriptor value
/// rather than the regular file descriptor.
///
/// Available since 5.15
pub fn openat_direct(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
path: [*:0]const u8,
flags: linux.O,
mode: posix.mode_t,
file_index: u32,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_openat_direct(fd, path, flags, mode, file_index);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `close(2)`.
/// Returns a pointer to the SQE.
/// Available since 5.6.
pub fn close(self: *IoUring, user_data: u64, fd: posix.fd_t) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_close(fd);
sqe.user_data = user_data;
return sqe;
}
/// Queues close of registered file descriptor.
/// Available since 5.15
pub fn close_direct(self: *IoUring, user_data: u64, file_index: u32) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_close_direct(file_index);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to register a timeout operation.
/// Returns a pointer to the SQE.
///
/// The timeout will complete when either the timeout expires, or after the
/// specified number of events complete (if `count` is greater than `0`).
///
/// `flags` may be `0` for a relative timeout, or `IORING_TIMEOUT_ABS` for an
/// absolute timeout.
///
/// The completion event result will be `-ETIME` if the timeout completed
/// through expiration, `0` if the timeout completed after the specified number
/// of events, or `-ECANCELED` if the timeout was removed before it expired.
///
/// io_uring timeouts use the `CLOCK.MONOTONIC` clock source.
pub fn timeout(
self: *IoUring,
user_data: u64,
ts: *const linux.kernel_timespec,
count: u32,
flags: uflags.Timeout,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_timeout(ts, count, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to remove an existing timeout operation.
/// Returns a pointer to the SQE.
///
/// The timeout is identified by its `user_data`.
///
/// The completion event result will be `0` if the timeout was found and
/// cancelled successfully else:
/// `-EBUSY` if the timeout was found but expiration was already in progress, or
/// `-ENOENT` if the timeout was not found.
pub fn timeout_remove(
self: *IoUring,
user_data: u64,
timeout_user_data: u64,
flags: uflags.Timeout,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_timeout_remove(timeout_user_data, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to add a link timeout operation.
/// Returns a pointer to the SQE.
///
/// You need to set IOSQE_IO_LINK to flags of the target operation and then
/// call this method right after the target operation.
/// See https://lwn.net/Articles/803932/ for detail.
///
/// If the dependent request finishes before the linked timeout, the timeout
/// is canceled. If the timeout finishes before the dependent request, the
/// dependent request will be canceled.
///
/// The completion event result of the link_timeout will be either of:
/// `-ETIME` if the timeout finishes before the dependent request (in this case,
/// the completion event result of the dependent request will be `-ECANCELED`)
/// `-EALREADY` if the dependent request finishes before the linked timeout.
pub fn link_timeout(
self: *IoUring,
user_data: u64,
ts: *const linux.kernel_timespec,
flags: uflags.Timeout,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_link_timeout(ts, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to remove an existing poll operation.
/// Returns a pointer to the SQE.
pub fn poll_remove(
self: *IoUring,
user_data: u64,
target_user_data: u64,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_poll_remove(target_user_data);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to update the user data of an existing
/// poll operation. Returns a pointer to the SQE.
pub fn poll_update(
self: *IoUring,
user_data: u64,
old_user_data: u64,
new_user_data: u64,
poll_mask: linux.Epoll,
flags: uflags.Poll,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_poll_update(old_user_data, new_user_data, poll_mask, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an `fallocate(2)`.
/// Returns a pointer to the SQE.
pub fn fallocate(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
mode: i32,
offset: u64,
len: u64,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_fallocate(fd, mode, offset, len);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an `statx(2)`.
/// Returns a pointer to the SQE.
pub fn statx(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
path: [:0]const u8,
flags: linux.At,
mask: linux.Statx.Mask,
buf: *linux.Statx,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_statx(fd, path, flags, mask, buf);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to remove an existing operation.
/// Returns a pointer to the SQE.
///
/// The operation is identified by its `user_data`.
///
/// The completion event result will be `0` if the operation was found and
/// cancelled successfully else either of:
/// `-EALREADY` if the operation was found but was already in progress
/// `-ENOENT` if the operation was not found.
pub fn cancel(
self: *IoUring,
user_data: u64,
cancel_user_data: u64,
flags: uflags.AsyncCancel,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_cancel(cancel_user_data, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `shutdown(2)`.
/// Returns a pointer to the SQE.
///
@ -1562,43 +1734,6 @@ pub fn socket_direct_alloc(
return sqe;
}
/// Queues (but does not submit) an SQE to perform an `bind(2)` on a socket.
/// Returns a pointer to the SQE.
/// Available since 6.11
pub fn bind(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
addr: *const posix.sockaddr,
addrlen: posix.socklen_t,
// liburing doesn't have this flag, hence 0 should be passed
// TODO: consider removing this and all flags like this
flags: u32,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_bind(fd, addr, addrlen, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an `listen(2)` on a socket.
/// Returns a pointer to the SQE.
/// Available since 6.11
pub fn listen(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
backlog: usize,
// liburing doesn't have this flag, hence 0 should be passed
// TODO: consider removing this and all flags like this
flags: u32,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_listen(fd, backlog, flags);
sqe.user_data = user_data;
return sqe;
}
/// Prepares an cmd request for a socket.
/// See: https://man7.org/linux/man-pages/man3/io_uring_prep_cmd.3.html
/// Available since 6.7.
@ -1832,10 +1967,11 @@ pub const Sqe = extern struct {
len: u32,
/// flags for any Sqe operation
/// rw_flags | fsync_flags | poll_event | poll32_event | sync_range_flags |
/// msg_flags timeout_flags | accept_flags | cancel_flags | open_flags |
/// statx_flags fadvise_advice | splice_flags | rename_flags | unlink_flags
/// | hardlink_flags xattr_flags | msg_ring_flags | uring_cmd_flags |
/// waitid_flags | futex_flags install_fd_flags | nop_flags | pipe_flags
/// msg_flags | timeout_flags | accept_flags | cancel_flags | open_flags |
/// statx_flags | fadvise_advice | splice_flags | rename_flags |
/// unlink_flags | hardlink_flags xattr_flags | msg_ring_flags |
/// uring_cmd_flags | waitid_flags | futex_flags install_fd_flags |
/// nop_flags | pipe_flags
rw_flags: u32,
/// data to be passed back at completion time
user_data: u64,
@ -2117,6 +2253,17 @@ pub const Sqe = extern struct {
sqe.prep_rw(.connect, fd, @intFromPtr(addr), 0, addrlen);
}
pub fn prep_epoll_wait(
sqe: *Sqe,
fd: linux.fd_t,
event: ?*linux.epoll_event,
max_events: u32,
flags: linux.Epoll,
) void {
sqe.prep_rw(.epoll_wait, fd, @intFromPtr(event), max_events, 0);
sqe.rw_flags = @bitCast(flags);
}
pub fn prep_epoll_ctl(
sqe: *Sqe,
epfd: linux.fd_t,
@ -2262,22 +2409,18 @@ pub const Sqe = extern struct {
}
pub fn prep_timeout_remove(sqe: *Sqe, timeout_user_data: u64, flags: uflags.Timeout) void {
sqe.* = .{
.opcode = .timeout_remove,
.flags = .{},
.ioprio = .init_empty(),
.fd = -1,
.off = 0,
.addr = timeout_user_data,
.len = 0,
.rw_flags = @bitCast(flags),
.user_data = 0,
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
.addr3 = 0,
.resv = 0,
sqe.prep_rw(.timeout_remove, -1, timeout_user_data, 0, 0);
sqe.rw_flags = @bitCast(flags);
}
pub fn prep_timeout_update(sqe: *Sqe, timeout_user_data: u64, ts: *const linux.kernel_timespec, flags: uflags.Timeout) void {
sqe.prep_rw(.timeout_remove, -1, timeout_user_data, 0, @intFromPtr(ts));
const enable_timeout_update = if (flags.timeout_update) flags else blk: {
var tflags = flags;
tflags.timeout_update = true;
break :blk tflags;
};
sqe.rw_flags = @bitCast(enable_timeout_update);
}
pub fn prep_link_timeout(
@ -2365,6 +2508,26 @@ pub const Sqe = extern struct {
sqe.rw_flags = @bitCast(flags);
}
pub fn prep_fadvice(
sqe: *Sqe,
fd: linux.fd_t,
offset: u64,
len: u32,
advice: linux.Fadvice,
) void {
sqe.prep_rw(.fadvise, fd, undefined, len, offset);
sqe.rw_flags = @intFromEnum(advice);
}
pub fn prep_madvice(
sqe: *Sqe,
memory: []u8,
advice: linux.Madvice,
) void {
sqe.prep_rw(.madvise, -1, @intFromPtr(memory.ptr), memory.len, 0);
sqe.rw_flags = @intFromEnum(advice);
}
pub fn prep_cancel(
sqe: *Sqe,
cancel_user_data: u64,
@ -2379,11 +2542,11 @@ pub const Sqe = extern struct {
fd: linux.fd_t,
flags: uflags.AsyncCancel,
) void {
sqe.prep_rw(.async_cancel, fd, 0, 0, 0);
const enable_cancel_fd = blk: {
var update_flags = flags;
update_flags.cancel_fd = true;
break :blk update_flags;
sqe.prep_rw(.async_cancel, fd, undefined, 0, 0);
const enable_cancel_fd = if (flags.cancel_fd) flags else blk: {
var cancel_flags = flags;
cancel_flags.cancel_fd = true;
break :blk cancel_flags;
};
sqe.rw_flags = @bitCast(enable_cancel_fd);
}
@ -2814,6 +2977,23 @@ pub const BufferGroup = struct {
allocator.free(self.heads);
}
/// Prepare multishot read operation which will select buffer from this
/// group.
pub fn read_multishot(
self: *BufferGroup,
user_data: u64,
fd: posix.fd_t,
nbytes: u32,
offset: u64,
) !*Sqe {
var sqe = try self.ring.get_sqe();
sqe.prep_rw(.read_multishot, fd, undefined, nbytes, offset);
sqe.flags.buffer_select = true;
sqe.buf_index = self.group_id;
sqe.user_data = user_data;
return sqe;
}
/// Prepare recv operation which will select buffer from this group.
pub fn recv(
self: *BufferGroup,