add IoUring send_bundle, send_to, recv_multishot, sync_file_range

ignore prep_openat2* for now

add non-at variant of some unlink, rename, mkdir, symlink, link

TODO: add Sync File Flags

Signed-off-by: Bernard Assan <mega.alpha100@gmail.com>
This commit is contained in:
Bernard Assan 2025-10-15 22:44:52 +00:00
parent ee71dcca10
commit ae9a05fc82
No known key found for this signature in database
GPG key ID: C2A2C53574321095
2 changed files with 365 additions and 171 deletions

View file

@ -3988,6 +3988,12 @@ pub const Shut = enum(u32) {
pub const RDWR: u32 = @intFromEnum(Shut.rdwr);
};
/// SYNC_FILE_RANGE_* flags
pub const SyncFileRange = packed struct(u32) {
_: u32 = 0, // TODO: fill out
};
/// Deprecated alias to Sock
pub const SOCK = Sock;
/// SOCK_* Socket type and flags
pub const Sock = packed struct(u32) {

View file

@ -737,6 +737,8 @@ pub fn timeout_update(
/// Queues (but does not submit) an SQE to perform an `accept4(2)` on a socket.
/// Returns a pointer to the SQE.
/// Available since 5.5
// TODO: can't we make the sockaddr and socklen_t combo in our api better?
// Investigate this
pub fn accept(
self: *IoUring,
user_data: u64,
@ -1002,6 +1004,7 @@ pub fn openat(
sqe.user_data = user_data;
return sqe;
}
// COMMIT: ignore openat2* for now
/// Queues an openat using direct (registered) file descriptors.
///
@ -1134,6 +1137,133 @@ pub fn madvice(
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `send(2)`.
/// Returns a pointer to the SQE.
/// Available since 5.6
pub fn send(
self: *IoUring,
user_data: u64,
sockfd: posix.fd_t,
buffer: []const u8,
flags: linux.Msg,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_send(sockfd, buffer, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a bundled `send(2)`.
/// Returns a pointer to the SQE.
pub fn send_bundle(
self: *IoUring,
user_data: u64,
sockfd: posix.fd_t,
len: u64,
flags: linux.Msg,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_send_bundle(sockfd, len, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a bundled `sendto(2)`.
/// Returns a pointer to the SQE.
pub fn send_to(
self: *IoUring,
user_data: u64,
sockfd: posix.fd_t,
buffer: []const u8,
flags: linux.Msg,
addr: *const linux.sockaddr,
addrlen: linux.socklen_t,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_send_to(sockfd, buffer, flags, addr, addrlen);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`.
///
/// This operation will most likely produce two CQEs. The flags field of the
/// first cqe may likely contain IORING_CQE_F_MORE, which means that there will
/// be a second cqe with the user_data field set to the same value. The user
/// must not modify the data buffer until the notification is posted. The first
/// cqe follows the usual rules and so its res field will contain the number of
/// bytes sent or a negative error code. The notification's res field will be
/// set to zero and the flags field will contain IORING_CQE_F_NOTIF. The two
/// step model is needed because the kernel may hold on to buffers for a long
/// time, e.g. waiting for a TCP ACK. Notifications responsible for controlling
/// the lifetime of the buffers. Even errored requests may generate a
/// notification.
///
/// Available since 6.0
pub fn send_zc(
self: *IoUring,
user_data: u64,
sockfd: posix.fd_t,
buffer: []const u8,
send_flags: linux.Msg,
zc_flags: Sqe.SendRecv,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_send_zc(sockfd, buffer, send_flags, zc_flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`.
/// Returns a pointer to the SQE.
/// Available since 6.0
pub fn send_zc_fixed(
self: *IoUring,
user_data: u64,
sockfd: posix.fd_t,
buffer: []const u8,
send_flags: linux.Msg,
zc_flags: Sqe.SendRecv,
buf_index: u16,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_send_zc_fixed(sockfd, buffer, send_flags, zc_flags, buf_index);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an async zerocopy `sendmsg(2)`.
/// Returns a pointer to the SQE.
/// Available since 6.1
pub fn sendmsg_zc(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
msg: *const posix.msghdr_const,
flags: linux.Msg,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_sendmsg_zc(fd, msg, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an fixed async zerocopy
/// `sendmsg(2)`. Returns a pointer to the SQE.
pub fn sendmsg_zc_fixed(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
msg: *const posix.msghdr_const,
flags: linux.Msg,
buf_index: u16,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_sendmsg_zc_fixed(fd, msg, flags, buf_index);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `recv(2)`.
/// Returns a pointer to the SQE.
/// Available since 5.6
@ -1158,176 +1288,24 @@ pub fn recv(
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `send(2)`.
/// Returns a pointer to the SQE.
/// Available since 5.6
pub fn send(
pub fn recv_multishot(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
buffer: []const u8,
sockfd: linux.fd_t,
buffer: RecvBuffer,
flags: linux.Msg,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_send(fd, buffer, flags);
sqe.user_data = user_data;
return sqe;
switch (buffer) {
.buffer => |slice| sqe.prep_recv_multishot(sockfd, slice, flags),
.buffer_selection => |selection| {
sqe.prep_rw(.recv, sockfd, 0, selection.len, 0);
sqe.ioprio = .{ .send_recv = .{ .recv_multishot = true } };
sqe.rw_flags = @bitCast(flags);
sqe.flags.buffer_select = true;
sqe.buf_index = selection.group_id;
},
}
/// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`.
///
/// This operation will most likely produce two CQEs. The flags field of the
/// first cqe may likely contain IORING_CQE_F_MORE, which means that there will
/// be a second cqe with the user_data field set to the same value. The user
/// must not modify the data buffer until the notification is posted. The first
/// cqe follows the usual rules and so its res field will contain the number of
/// bytes sent or a negative error code. The notification's res field will be
/// set to zero and the flags field will contain IORING_CQE_F_NOTIF. The two
/// step model is needed because the kernel may hold on to buffers for a long
/// time, e.g. waiting for a TCP ACK. Notifications responsible for controlling
/// the lifetime of the buffers. Even errored requests may generate a
/// notification.
///
/// Available since 6.0
pub fn send_zc(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
buffer: []const u8,
send_flags: linux.Msg,
zc_flags: Sqe.SendRecv,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_send_zc(fd, buffer, send_flags, zc_flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`.
/// Returns a pointer to the SQE.
/// Available since 6.0
pub fn send_zc_fixed(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
buffer: []const u8,
send_flags: linux.Msg,
zc_flags: Sqe.SendRecv,
buf_index: u16,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_send_zc_fixed(fd, buffer, send_flags, zc_flags, buf_index);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an async zerocopy `sendmsg(2)`.
/// Returns a pointer to the SQE.
/// Available since 6.1
pub fn sendmsg_zc(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
msg: *const linux.msghdr_const,
flags: linux.Msg,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_sendmsg_zc(fd, msg, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `shutdown(2)`.
/// Returns a pointer to the SQE.
///
/// The operation is identified by its `user_data`.
pub fn shutdown(
self: *IoUring,
user_data: u64,
sockfd: posix.socket_t,
how: linux.Shut,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_shutdown(sockfd, how);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `renameat2(2)`.
/// Returns a pointer to the SQE.
pub fn renameat(
self: *IoUring,
user_data: u64,
old_dir_fd: linux.fd_t,
old_path: [*:0]const u8,
new_dir_fd: linux.fd_t,
new_path: [*:0]const u8,
flags: linux.Rename,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_renameat(old_dir_fd, old_path, new_dir_fd, new_path, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `unlinkat(2)`.
/// Returns a pointer to the SQE.
pub fn unlinkat(
self: *IoUring,
user_data: u64,
dir_fd: linux.fd_t,
path: [*:0]const u8,
flags: linux.At,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_unlinkat(dir_fd, path, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `mkdirat(2)`.
/// Returns a pointer to the SQE.
pub fn mkdirat(
self: *IoUring,
user_data: u64,
dir_fd: linux.fd_t,
path: [*:0]const u8,
mode: posix.mode_t,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_mkdirat(dir_fd, path, mode);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `symlinkat(2)`.
/// Returns a pointer to the SQE.
pub fn symlinkat(
self: *IoUring,
user_data: u64,
target: [*:0]const u8,
new_dir_fd: linux.fd_t,
link_path: [*:0]const u8,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_symlinkat(target, new_dir_fd, link_path);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `linkat(2)`.
/// Returns a pointer to the SQE.
pub fn linkat(
self: *IoUring,
user_data: u64,
old_dir_fd: linux.fd_t,
old_path: [*:0]const u8,
new_dir_fd: linux.fd_t,
new_path: [*:0]const u8,
flags: linux.At,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_linkat(old_dir_fd, old_path, new_dir_fd, new_path, flags);
sqe.user_data = user_data;
return sqe;
}
@ -1370,6 +1348,174 @@ pub fn remove_buffers(
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `shutdown(2)`.
/// Returns a pointer to the SQE.
///
/// The operation is identified by its `user_data`.
pub fn shutdown(
self: *IoUring,
user_data: u64,
sockfd: posix.socket_t,
how: linux.Shut,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_shutdown(sockfd, how);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `unlinkat(2)`.
/// Returns a pointer to the SQE.
pub fn unlinkat(
self: *IoUring,
user_data: u64,
dir_fd: posix.fd_t,
path: [*:0]const u8,
flags: linux.At,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_unlinkat(dir_fd, path, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `unlink(2)`.
/// Returns a pointer to the SQE.
pub fn unlink(
self: *IoUring,
user_data: u64,
path: [*:0]const u8,
flags: linux.At,
) !*Sqe {
return try self.unlinkat(user_data, linux.At.fdcwd, path, flags);
}
/// Queues (but does not submit) an SQE to perform a `renameat2(2)`.
/// Returns a pointer to the SQE.
pub fn renameat(
self: *IoUring,
user_data: u64,
old_dir_fd: posix.fd_t,
old_path: [*:0]const u8,
new_dir_fd: posix.fd_t,
new_path: [*:0]const u8,
flags: linux.Rename,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_renameat(old_dir_fd, old_path, new_dir_fd, new_path, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `rename(2)`.
/// Returns a pointer to the SQE.
pub fn rename(
self: *IoUring,
user_data: u64,
old_path: [*:0]const u8,
new_path: [*:0]const u8,
flags: linux.Rename,
) !*Sqe {
return try self.renameat(user_data, linux.At.fdcwd, old_path, linux.At.fdcwd, new_path, flags);
}
/// Queues (but does not submit) an SQE to perform a `sync_file_range(2)`.
/// Returns a pointer to the SQE.
pub fn sync_file_range(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
len: u32,
offset: u64,
flags: linux.SyncFileRange, // TODO: add flags
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_sync_file_range(fd, len, offset, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `mkdirat(2)`.
/// Returns a pointer to the SQE.
pub fn mkdirat(
self: *IoUring,
user_data: u64,
dir_fd: posix.fd_t,
path: [*:0]const u8,
mode: posix.mode_t,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_mkdirat(dir_fd, path, mode);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `mkdir(2)`.
/// Returns a pointer to the SQE.
pub fn mkdir(
self: *IoUring,
user_data: u64,
path: [*:0]const u8,
mode: posix.mode_t,
) !*Sqe {
return try self.mkdirat(user_data, linux.At.fdcwd, path, mode);
}
/// Queues (but does not submit) an SQE to perform a `symlinkat(2)`.
/// Returns a pointer to the SQE.
pub fn symlinkat(
self: *IoUring,
user_data: u64,
target: [*:0]const u8,
new_dir_fd: posix.fd_t,
link_path: [*:0]const u8,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_symlinkat(target, new_dir_fd, link_path);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `symlink(2)`.
/// Returns a pointer to the SQE.
pub fn symlink(
self: *IoUring,
user_data: u64,
target: [*:0]const u8,
link_path: [*:0]const u8,
) !*Sqe {
return try self.symlinkat(user_data, target, linux.At.fdcwd, link_path);
}
/// Queues (but does not submit) an SQE to perform a `linkat(2)`.
/// Returns a pointer to the SQE.
pub fn linkat(
self: *IoUring,
user_data: u64,
old_dir_fd: posix.fd_t,
old_path: [*:0]const u8,
new_dir_fd: posix.fd_t,
new_path: [*:0]const u8,
flags: linux.At,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_linkat(old_dir_fd, old_path, new_dir_fd, new_path, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `link(2)`.
/// Returns a pointer to the SQE.
pub fn link(
self: *IoUring,
user_data: u64,
old_path: [*:0]const u8,
new_path: [*:0]const u8,
flags: linux.At,
) !*Sqe {
return try self.linkat(user_data, linux.At.fdcwd, old_path, linux.At.fdcwd, new_path, flags);
}
/// Queues (but does not submit) an SQE to perform a `waitid(2)`.
/// Returns a pointer to the SQE.
pub fn waitid(
@ -2310,11 +2456,31 @@ pub const Sqe = extern struct {
}
// COMMIT: fix send[|recv] flag param type
pub fn prep_send(sqe: *Sqe, fd: linux.fd_t, buffer: []const u8, flags: linux.Msg) void {
sqe.prep_rw(.send, fd, @intFromPtr(buffer.ptr), buffer.len, 0);
pub fn prep_send(sqe: *Sqe, sockfd: linux.fd_t, buffer: []const u8, flags: linux.Msg) void {
sqe.prep_rw(.send, sockfd, @intFromPtr(buffer.ptr), buffer.len, 0);
sqe.rw_flags = @bitCast(flags);
}
pub fn prep_send_bundle(sqe: *Sqe, sockfd: linux.fd_t, len: u64, flags: linux.Msg) void {
sqe.prep_rw(.send, sockfd, undefined, len, 0);
sqe.rw_flags = @bitCast(flags);
sqe.ioprio = .{ .send_recv = .{ .recvsend_bundle = true } };
}
pub fn prep_send_to(
sqe: *Sqe,
sockfd: linux.fd_t,
buffer: []const u8,
flags: linux.Msg,
addr: *const linux.sockaddr,
addrlen: linux.socklen_t,
) void {
// addr2 maps to sqe.off and addr_len maps to sqe.splice_fd_in
sqe.prep_send(.send, sockfd, buffer, flags);
sqe.off = @intFromPtr(addr);
sqe.splice_fd_in = @intCast(addrlen);
}
pub fn prep_send_zc(sqe: *Sqe, fd: linux.fd_t, buffer: []const u8, flags: linux.Msg, zc_flags: Sqe.SendRecv) void {
sqe.prep_rw(.send_zc, fd, @intFromPtr(buffer.ptr), buffer.len, 0);
sqe.rw_flags = @bitCast(flags);
@ -2322,12 +2488,12 @@ pub const Sqe = extern struct {
}
pub fn prep_send_zc_fixed(sqe: *Sqe, fd: linux.fd_t, buffer: []const u8, flags: linux.Msg, zc_flags: Sqe.SendRecv, buf_index: u16) void {
const zc_flags_fixed = blk: {
const zc_flags_fixed = if (zc_flags.recvsend_fixed_buf) zc_flags else blk: {
var updated_flags = zc_flags;
updated_flags.recvsend_fixed_buf = true;
break :blk updated_flags;
};
prep_send_zc(sqe, fd, buffer, flags, zc_flags_fixed);
sqe.prep_send_zc(fd, buffer, flags, zc_flags_fixed);
sqe.buf_index = buf_index;
}
@ -2347,10 +2513,21 @@ pub const Sqe = extern struct {
msg: *const linux.msghdr_const,
flags: linux.Msg,
) void {
prep_sendmsg(sqe, fd, msg, flags);
sqe.prep_sendmsg(fd, msg, flags);
sqe.opcode = .sendmsg_zc;
}
pub fn prep_sendmsg_zc_fixed(
sqe: *Sqe,
fd: linux.fd_t,
msg: *const linux.msghdr_const,
flags: linux.Msg,
buf_index: u16,
) void {
sqe.prep_sendmsg_zc(fd, msg, flags);
sqe.ioprio = .{ .send_recv = .{ .recvsend_fixed_buf = true } };
sqe.buf_index = buf_index;
}
pub fn prep_openat(
sqe: *Sqe,
fd: linux.fd_t,
@ -2588,6 +2765,17 @@ pub const Sqe = extern struct {
sqe.rw_flags = @bitCast(flags);
}
pub fn prep_sync_file_range(
sqe: *Sqe,
fd: posix.fd_t,
len: u32,
offset: u64,
flags: linux.SyncFileRange, // TODO: add flags
) void {
sqe.prep_rw(.sync_file_range, fd, undefined, len, offset);
sqe.rw_flags = @bitCast(flags);
}
pub fn prep_mkdirat(
sqe: *Sqe,
dir_fd: linux.fd_t,
@ -3463,7 +3651,7 @@ pub const FileIndexRange = extern struct {
};
/// matches `io_uring_recvmsg_out` in liburing
pub const RecvmsgOut = extern struct {
pub const RecvMsgOut = extern struct {
namelen: u32,
controllen: u32,
payloadlen: u32,