Move buf_ring_* functions into BufferRing type as methods

add IoUring tee syscall functionality

adapt ReadBuffer union idea for read_fixed to support fixed read and readv

Use same idea and create a WriteBuffer type for write() and write_fixed()

ignored flags for splice and tee lets see if they become important in
the future

add a get_data helper to Cqe

Signed-off-by: Bernard Assan <mega.alpha100@gmail.com>
This commit is contained in:
Bernard Assan 2025-10-13 23:15:54 +00:00
parent 70ee443ae7
commit f68bcb6b5f
No known key found for this signature in database
GPG key ID: C2A2C53574321095

View file

@ -407,51 +407,6 @@ pub fn nop(self: *IoUring, user_data: u64) !*Sqe {
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `read(2)` or `preadv(2)`
/// depending on the buffer type.
/// * Reading into a `ReadBuffer.buffer` uses `read(2)`
/// * Reading into a `ReadBuffer.iovecs` uses `preadv(2)`
///
/// If you want to do a `preadv2(2)` then set `rw_flags` on the returned SQE.
/// See https://man7.org/linux/man-pages/man2/preadv2.2.html
///
/// Returns a pointer to the SQE.
pub fn read(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
buffer: ReadBuffer,
offset: u64,
) !*Sqe {
const sqe = try self.get_sqe();
switch (buffer) {
.buffer => |slice| sqe.prep_read(fd, slice, offset),
.iovecs => |vecs| sqe.prep_readv(fd, vecs, offset),
.buffer_selection => |selection| {
sqe.prep_rw(.read, fd, 0, selection.len, offset);
sqe.flags.buffer_select = true;
sqe.buf_index = selection.group_id;
},
}
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `write(2)`.
/// Returns a pointer to the SQE.
pub fn write(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
buffer: []const u8,
offset: u64,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_write(fd, buffer, offset);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `splice(2)`
/// Either `fd_in` or `fd_out` must be a pipe.
/// If `fd_in` refers to a pipe, `off_in` is ignored and must be set to
@ -488,6 +443,51 @@ pub fn splice(
return sqe;
}
// COMMIT: ignored flags for splice and tee lets see if they become important
// in the future
pub fn tee(
self: *IoUring,
user_data: u64,
fd_in: posix.fd_t,
fd_out: posix.fd_t,
len: usize,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_tee(fd_in, fd_out, len);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `pread(2)` or `preadv(2)`
/// depending on the buffer type.
/// * Reading into a `ReadBuffer.buffer` uses `pread(2)`
/// * Reading into a `ReadBuffer.iovecs` uses `preadv(2)`
///
/// If you want to do a `preadv2(2)` then set `rw_flags` on the returned SQE.
/// See https://man7.org/linux/man-pages/man2/preadv2.2.html
///
/// Returns a pointer to the SQE.
pub fn read(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
buffer: ReadBuffer,
offset: u64,
) !*Sqe {
const sqe = try self.get_sqe();
switch (buffer) {
.buffer => |slice| sqe.prep_read(fd, slice, offset),
.iovecs => |vecs| sqe.prep_readv(fd, vecs, offset),
.buffer_selection => |selection| {
sqe.prep_rw(.read, fd, 0, selection.len, offset);
sqe.flags.buffer_select = true;
sqe.buf_index = selection.group_id;
},
}
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a IORING_OP_READ_FIXED.
/// The `buffer` provided must be registered with the kernel by calling
/// `register_buffers()` first. The `buffer_index` must be the same as its
@ -499,30 +499,45 @@ pub fn read_fixed(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
buffer: *posix.iovec,
buffer: ReadBuffer,
offset: u64,
buffer_index: u16,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_read_fixed(fd, buffer, offset, buffer_index);
switch (buffer) {
.buffer => |slice| sqe.prep_read_fixed(fd, slice, offset, buffer_index),
.iovecs => |vecs| sqe.prep_readv_fixed(fd, vecs, offset, buffer_index),
.buffer_selection => |selection| {
sqe.prep_rw(.read_fixed, fd, 0, selection.len, offset);
sqe.flags.buffer_select = true;
sqe.buf_index = selection.group_id;
},
}
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `pwritev()`.
/// Queues (but does not submit) an SQE to perform a `pwrite(2)` or `pwritev(2)`
/// depending on the write buffer type.
/// * Reading into a `WriteBuffer.buffer` uses `pwrite(2)`
/// * Reading into a `WriteBuffer.iovecs` uses `pwritev(2)`
///
/// Returns a pointer to the SQE so that you can further modify the SQE for
/// advanced use cases.
/// For example, if you want to do a `pwritev2()` then set `rw_flags` on the
/// returned SQE. See https://linux.die.net/man/2/pwritev.
pub fn writev(
pub fn write(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
iovecs: []const posix.iovec_const,
buffer: WriteBuffer,
offset: u64,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_writev(fd, iovecs, offset);
switch (buffer) {
.buffer => |slice| sqe.prep_write(fd, slice, offset),
.iovecs => |vecs| sqe.prep_writev(fd, vecs, offset),
}
sqe.user_data = user_data;
return sqe;
}
@ -538,12 +553,80 @@ pub fn write_fixed(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
buffer: *posix.iovec,
buffer: WriteBuffer,
offset: u64,
buffer_index: u16,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_write_fixed(fd, buffer, offset, buffer_index);
switch (buffer) {
.buffer => |slice| {
sqe.prep_write_fixed(fd, slice, offset, buffer_index);
},
.iovecs => |vecs| {
sqe.prep_writev_fixed(fd, vecs, offset, buffer_index);
},
}
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `recvmsg(2)`.
/// Returns a pointer to the SQE.
/// Available since 5.3
pub fn recvmsg(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
msg: *posix.msghdr,
flags: linux.Msg,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_recvmsg(fd, msg, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a multishot `recvmsg(2)`.
/// Returns a pointer to the SQE.
pub fn recvmsg_multishot(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
msg: *posix.msghdr,
flags: linux.Msg,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_recvmsg_multishot(fd, msg, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `sendmsg(2)`.
/// Returns a pointer to the SQE.
/// Available since 5.3
pub fn sendmsg(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
msg: *const posix.msghdr_const,
flags: linux.Msg,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_sendmsg(fd, msg, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `poll(2)`.
/// Returns a pointer to the SQE.
pub fn poll_add(
self: *IoUring,
user_data: u64,
fd: posix.fd_t,
poll_mask: linux.Epoll,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_poll_add(fd, poll_mask);
sqe.user_data = user_data;
return sqe;
}
@ -746,38 +829,6 @@ pub fn send_zc_fixed(
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `recvmsg(2)`.
/// Returns a pointer to the SQE.
/// Available since 5.3
pub fn recvmsg(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
msg: *linux.msghdr,
flags: linux.Msg,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_recvmsg(fd, msg, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `sendmsg(2)`.
/// Returns a pointer to the SQE.
/// Available since 5.3
pub fn sendmsg(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
msg: *const linux.msghdr_const,
flags: linux.Msg,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_sendmsg(fd, msg, flags);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to perform an async zerocopy `sendmsg(2)`.
/// Returns a pointer to the SQE.
/// Available since 6.1
@ -931,20 +982,6 @@ pub fn link_timeout(
return sqe;
}
/// Queues (but does not submit) an SQE to perform a `poll(2)`.
/// Returns a pointer to the SQE.
pub fn poll_add(
self: *IoUring,
user_data: u64,
fd: linux.fd_t,
poll_mask: linux.Epoll,
) !*Sqe {
const sqe = try self.get_sqe();
sqe.prep_poll_add(fd, poll_mask);
sqe.user_data = user_data;
return sqe;
}
/// Queues (but does not submit) an SQE to remove an existing poll operation.
/// Returns a pointer to the SQE.
pub fn poll_remove(
@ -1186,25 +1223,16 @@ pub fn register_buffers_sparse(self: *IoUring, nr: u32) !void {
.nr = nr,
};
const res = linux.io_uring_register(
self.fd,
.register_buffers2,
&reg,
@sizeOf(RsrcRegister),
);
const res = linux.io_uring_register(self.fd, .register_buffers2, &reg, @sizeOf(RsrcRegister));
try handle_registration_result(res);
}
/// Registers an array of buffers for use with `read_fixed` and `write_fixed`.
/// Registers an array of buffers for use with `read_fixed`, `readv_fixed`,
/// `write_fixed` and `writev_fixed`.
pub fn register_buffers(self: *IoUring, buffers: []const posix.iovec) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
self.fd,
.register_buffers,
buffers.ptr,
@intCast(buffers.len),
);
const res = linux.io_uring_register(self.fd, .register_buffers, buffers.ptr, @intCast(buffers.len));
try handle_registration_result(res);
}
@ -1237,12 +1265,7 @@ pub fn register_files_update(self: *IoUring, offset: u32, fds: []const posix.fd_
.data = @intFromPtr(fds.ptr),
});
const res = linux.io_uring_register(
self.fd,
.register_files_update,
&update,
@intCast(fds.len),
);
const res = linux.io_uring_register(self.fd, .register_files_update, &update, @intCast(fds.len));
try handle_registration_result(res);
}
@ -1255,12 +1278,7 @@ pub fn register_files_sparse(self: *IoUring, nr_files: u32) !void {
.flags = .{ .register_sparse = true },
});
const res = linux.io_uring_register(
self.fd,
.register_files2,
&reg,
@sizeOf(RsrcRegister),
);
const res = linux.io_uring_register(self.fd, .register_files2, &reg, @sizeOf(RsrcRegister));
return handle_registration_result(res);
}
@ -1283,12 +1301,7 @@ pub fn register_files_sparse(self: *IoUring, nr_files: u32) !void {
/// file descriptors.
pub fn register_files(self: *IoUring, fds: []const linux.fd_t) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
self.fd,
.register_files,
fds.ptr,
@intCast(fds.len),
);
const res = linux.io_uring_register(self.fd, .register_files, fds.ptr, @intCast(fds.len));
try handle_registration_result(res);
}
@ -1309,12 +1322,7 @@ pub fn unregister_files(self: *IoUring) !void {
/// Only a single a eventfd can be registered at any given point in time.
pub fn register_eventfd(self: *IoUring, fd: linux.fd_t) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
self.fd,
.register_eventfd,
&fd,
1,
);
const res = linux.io_uring_register(self.fd, .register_eventfd, &fd, 1);
try handle_registration_result(res);
}
@ -1325,35 +1333,20 @@ pub fn register_eventfd(self: *IoUring, fd: linux.fd_t) !void {
/// Only a single eventfd can be registered at any given point in time.
pub fn register_eventfd_async(self: *IoUring, fd: linux.fd_t) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
self.fd,
.register_eventfd_async,
&fd,
1,
);
const res = linux.io_uring_register(self.fd, .register_eventfd_async, &fd, 1);
try handle_registration_result(res);
}
/// Unregister the registered eventfd file descriptor.
pub fn unregister_eventfd(self: *IoUring) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
self.fd,
.unregister_eventfd,
null,
0,
);
const res = linux.io_uring_register(self.fd, .unregister_eventfd, null, 0);
try handle_registration_result(res);
}
pub fn register_probe(self: *IoUring, probe: []Probe) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
self.fd,
.register_probe,
probe.ptr,
@intCast(probe.len),
);
const res = linux.io_uring_register(self.fd, .register_probe, probe.ptr, @intCast(probe.len));
try handle_registration_result(res);
}
@ -1361,12 +1354,7 @@ pub fn register_probe(self: *IoUring, probe: []Probe) !void {
/// matches `io_uring_register_personality()` in liburing
pub fn register_personality(self: *IoUring) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
self.fd,
.register_personality,
null,
0,
);
const res = linux.io_uring_register(self.fd, .register_personality, null, 0);
try handle_registration_result(res);
}
@ -1378,23 +1366,13 @@ pub fn unregister_personality(self: *IoUring, credential_id: u32) !void {
pub fn register_restrictions(self: *IoUring, restriction: []Restriction) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
self.fd,
.register_restrictions,
restriction.ptr,
@intCast(restriction.len),
);
const res = linux.io_uring_register(self.fd, .register_restrictions, restriction.ptr, @intCast(restriction.len));
try handle_registration_result(res);
}
pub fn enable_rings(self: *IoUring) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
self.fd,
.register_enable_rings,
null,
0,
);
const res = linux.io_uring_register(self.fd, .register_enable_rings, null, 0);
try handle_registration_result(res);
}
@ -1403,24 +1381,14 @@ pub fn register_iowq_aff(self: *IoUring, cpusz: u32, mask: *linux.cpu_set_t) !vo
if (cpusz >= math.maxInt(u32)) return error.ArgumentsInvalid;
const res = linux.io_uring_register(
self.fd,
.register_iowq_aff,
mask,
cpusz,
);
const res = linux.io_uring_register(self.fd, .register_iowq_aff, mask, cpusz);
try handle_registration_result(res);
}
pub fn unregister_iowq_aff(self: *IoUring) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
self.fd,
.unregister_iowq_aff,
null,
0,
);
const res = linux.io_uring_register(self.fd, .unregister_iowq_aff, null, 0);
try handle_registration_result(res);
}
@ -1432,12 +1400,7 @@ pub fn unregister_iowq_aff(self: *IoUring) !void {
pub fn register_iowq_max_workers(self: *IoUring, max_workers: [2]u32) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
self.fd,
.register_iowq_max_workers,
&max_workers,
2,
);
const res = linux.io_uring_register(self.fd, .register_iowq_max_workers, &max_workers, 2);
try handle_registration_result(res);
}
@ -1445,12 +1408,7 @@ pub fn register_iowq_max_workers(self: *IoUring, max_workers: [2]u32) !void {
pub fn register_sync_cancel(self: *IoUring, cancel_reg: *SyncCancelRegister) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
self.fd,
.register_sync_cancel,
cancel_reg,
1,
);
const res = linux.io_uring_register(self.fd, .register_sync_cancel, cancel_reg, 1);
try handle_registration_result(res);
}
@ -1458,12 +1416,7 @@ pub fn register_sync_cancel(self: *IoUring, cancel_reg: *SyncCancelRegister) !vo
pub fn register_sync_msg(self: *IoUring, sqe: *Sqe) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
-1,
.register_send_msg_ring,
sqe,
1,
);
const res = linux.io_uring_register(-1, .register_send_msg_ring, sqe, 1);
try handle_registration_result(res);
}
@ -1479,57 +1432,32 @@ pub fn register_file_alloc_range(self: *IoUring, offset: u32, len: u32) !void {
.resv = 0,
};
const res = linux.io_uring_register(
self.fd,
.register_file_alloc_range,
&range,
0,
);
const res = linux.io_uring_register(self.fd, .register_file_alloc_range, &range, 0);
return handle_registration_result(res);
}
pub fn register_napi(self: *IoUring, napi: *Napi) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
self.fd,
.register_napi,
napi,
1,
);
const res = linux.io_uring_register(self.fd, .register_napi, napi, 1);
try handle_registration_result(res);
}
pub fn unregister_napi(self: *IoUring, napi: *Napi) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
self.fd,
.unregister_napi,
napi,
1,
);
const res = linux.io_uring_register(self.fd, .unregister_napi, napi, 1);
try handle_registration_result(res);
}
pub fn register_clock(self: *IoUring, clock_reg: *ClockRegister) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
self.fd,
.register_clock,
clock_reg,
0,
);
const res = linux.io_uring_register(self.fd, .register_clock, clock_reg, 0);
try handle_registration_result(res);
}
pub fn register_ifq(self: *IoUring, ifq_reg: *ZcrxIfqRegister) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
self.fd,
.register_zcrx_ifq,
ifq_reg,
1,
);
const res = linux.io_uring_register(self.fd, .register_zcrx_ifq, ifq_reg, 1);
try handle_registration_result(res);
}
@ -1540,12 +1468,7 @@ pub fn register_resize_rings(self: *IoUring, _: *Params) !void {
pub fn register_region(self: *IoUring, mem_reg: *MemRegionRegister) !void {
assert(self.fd >= 0);
const res = linux.io_uring_register(
self.fd,
.register_mem_region,
mem_reg,
1,
);
const res = linux.io_uring_register(self.fd, .register_mem_region, mem_reg, 1);
try handle_registration_result(res);
}
@ -1747,12 +1670,14 @@ pub fn getsockopt(
/// a power of 2.
/// `fd` is IO_Uring.fd for which the provided buffer ring is being registered.
/// `group_id` is the chosen buffer group ID, unique in IO_Uring.
pub fn setup_buf_ring(
fd: linux.fd_t,
/// matches `io_uring_setup_buf_ring()` in liburing
pub fn init_buffer_ring(
self: *IoUring,
entries: u16,
group_id: u16,
flags: BufferRegister.Flags,
) !*align(page_size_min) BufferRing {
assert(self.fd >= 0);
if (entries == 0 or entries > math.maxInt(u16)) return error.EntriesNotInRange;
if (!math.isPowerOfTwo(entries)) return error.EntriesNotPowerOfTwo;
@ -1768,51 +1693,36 @@ pub fn setup_buf_ring(
errdefer posix.munmap(mmap);
assert(mmap.len == mmap_size);
const br_addr: *align(page_size_min) BufferRing = @ptrCast(mmap.ptr);
const buffer_ring: *align(page_size_min) BufferRing = @ptrCast(mmap.ptr);
var reg = mem.zeroInit(BufferRegister, .{
.ring_addr = @intFromPtr(br_addr),
.ring_addr = @intFromPtr(buffer_ring),
.ring_entries = entries,
.bgid = group_id,
.flags = flags,
});
try register_buf_ring(fd, &reg);
return br_addr;
try self.register_buffer_ring(&reg);
buffer_ring.init();
return buffer_ring;
}
pub fn register_buf_ring(
fd: posix.fd_t,
buf_reg: *BufferRegister,
) !void {
var res = linux.io_uring_register(
fd,
.register_pbuf_ring,
buf_reg,
1,
);
/// matches `io_uring_register_buf_ring`
pub fn register_buffer_ring(self: *IoUring, buf_reg: *BufferRegister) !void {
var res = linux.io_uring_register(self.fd, .register_pbuf_ring, buf_reg, 1);
if (linux.E.init(res) == .INVAL and buf_reg.flags.iou_pbuf_ring_inc) {
// Retry without incremental buffer consumption.
// It is available since kernel 6.12. returns INVAL on older.
buf_reg.flags.iou_pbuf_ring_inc = false;
res = linux.io_uring_register(
fd,
.register_pbuf_ring,
buf_reg,
1,
);
res = linux.io_uring_register(self.fd, .register_pbuf_ring, buf_reg, 1);
}
try handle_register_buf_ring_result(res);
}
pub fn unregister_buf_ring(fd: posix.fd_t, buf_group_id: u16) !void {
/// matches `io_uring_unregister_buf_ring`
pub fn unregister_buffer_ring(self: *IoUring, buf_group_id: u16) !void {
var reg = mem.zeroInit(BufferRegister, .{
.bgid = buf_group_id,
});
const res = linux.io_uring_register(
fd,
.unregister_pbuf_ring,
&reg,
1,
);
const res = linux.io_uring_register(self.fd, .unregister_pbuf_ring, &reg, 1);
try handle_register_buf_ring_result(res);
}
@ -1824,57 +1734,6 @@ fn handle_register_buf_ring_result(res: usize) !void {
}
}
// Unregisters a previously registered shared buffer ring, returned from
// io_uring_setup_buf_ring.
pub fn free_buf_ring(fd: posix.fd_t, br: *align(page_size_min) BufferRing, entries: u32, group_id: u16) void {
unregister_buf_ring(fd, group_id) catch {};
var mmap: []align(page_size_min) u8 = undefined;
mmap.ptr = @ptrCast(br);
mmap.len = entries * @sizeOf(Buffer);
posix.munmap(mmap);
}
/// Initialises `br` so that it is ready to be used.
pub fn buf_ring_init(br: *BufferRing) void {
br.tail = 0;
}
/// Calculates the appropriate size mask for a buffer ring.
/// `entries` is the ring entries as specified in io_uring_register_buf_ring.
pub fn buf_ring_mask(entries: u16) u16 {
return entries - 1;
}
/// Assigns `buffer` with the `br` buffer ring.
/// `buffer_id` is identifier which will be returned in the CQE.
/// `buffer_offset` is the offset to insert at from the current tail.
/// If just one buffer is provided before the ring tail is committed with
/// advance then offset should be 0.
/// If buffers are provided in a loop before being committed, the offset must
/// be incremented by one for each buffer added.
pub fn buf_ring_add(
br: *BufferRing,
buffer: []u8,
buffer_id: u16,
mask: u16,
buffer_offset: u16,
) void {
const bufs: [*]Buffer = @ptrCast(br);
const buf: *Buffer = &bufs[(br.tail +% buffer_offset) & mask];
buf.addr = @intFromPtr(buffer.ptr);
buf.len = @intCast(buffer.len);
buf.bid = buffer_id;
}
/// Make `count` new buffers visible to the kernel. Called after
/// `io_uring_buf_ring_add` has been called `count` times to fill in new
/// buffers.
pub fn buf_ring_advance(br: *BufferRing, count: u16) void {
const tail: u16 = br.tail +% count;
@atomicStore(u16, &br.tail, tail, .release);
}
/// IO completion data structure (Completion Queue Entry)
pub const Cqe = extern struct {
/// sqe.user_data value passed back
@ -1922,6 +1781,12 @@ pub const Cqe = extern struct {
_17: u16 = 0,
};
/// Retrive the 64-bit cqe `user_data`, as `*T` after completion of an Sqe
/// this data is passed through `Sqe` -> `Cqe` unchanged
pub fn get_data(cqe: Cqe, comptime T: type) *T {
return @ptrFromInt(cqe.user_data);
}
pub fn err(self: Cqe) linux.E {
if (self.res > -4096 and self.res < 0) {
return @enumFromInt(-self.res);
@ -2108,29 +1973,10 @@ pub const Sqe = extern struct {
};
}
pub fn prep_read(sqe: *Sqe, fd: linux.fd_t, buffer: []u8, offset: u64) void {
sqe.prep_rw(.read, fd, @intFromPtr(buffer.ptr), buffer.len, offset);
}
pub fn prep_write(sqe: *Sqe, fd: linux.fd_t, buffer: []const u8, offset: u64) void {
sqe.prep_rw(.write, fd, @intFromPtr(buffer.ptr), buffer.len, offset);
}
pub fn prep_splice(sqe: *Sqe, fd_in: linux.fd_t, off_in: u64, fd_out: linux.fd_t, off_out: u64, len: usize) void {
sqe.prep_rw(.splice, fd_out, undefined, len, off_out);
sqe.addr = off_in;
sqe.splice_fd_in = fd_in;
}
pub fn prep_readv(
sqe: *Sqe,
fd: linux.fd_t,
iovecs: []const std.posix.iovec,
offset: u64,
) void {
sqe.prep_rw(.readv, fd, @intFromPtr(iovecs.ptr), iovecs.len, offset);
}
pub fn prep_writev(
sqe: *Sqe,
fd: linux.fd_t,
@ -2140,13 +1986,60 @@ pub const Sqe = extern struct {
sqe.prep_rw(.writev, fd, @intFromPtr(iovecs.ptr), iovecs.len, offset);
}
pub fn prep_read_fixed(sqe: *Sqe, fd: linux.fd_t, buffer: *std.posix.iovec, offset: u64, buffer_index: u16) void {
sqe.prep_rw(.read_fixed, fd, @intFromPtr(buffer.base), buffer.len, offset);
pub fn prep_write_fixed(sqe: *Sqe, fd: linux.fd_t, buffer: []const u8, offset: u64, buffer_index: u16) void {
sqe.prep_rw(.write_fixed, fd, @intFromPtr(buffer.ptr), buffer.len, offset);
sqe.buf_index = buffer_index;
}
pub fn prep_write_fixed(sqe: *Sqe, fd: linux.fd_t, buffer: *std.posix.iovec, offset: u64, buffer_index: u16) void {
sqe.prep_rw(.write_fixed, fd, @intFromPtr(buffer.base), buffer.len, offset);
pub fn prep_writev_fixed(sqe: *Sqe, fd: linux.fd_t, iovecs: []const posix.iovec_const, offset: u64, buffer_index: u16) void {
sqe.prep_rw(.write_fixed, fd, @intFromPtr(iovecs.ptr), iovecs.len, offset);
sqe.buf_index = buffer_index;
}
pub fn prep_splice(sqe: *Sqe, fd_in: linux.fd_t, off_in: u64, fd_out: linux.fd_t, off_out: u64, len: usize) void {
sqe.prep_rw(.splice, fd_out, undefined, len, off_out);
sqe.addr = off_in;
sqe.splice_fd_in = fd_in;
}
pub fn prep_tee(sqe: *Sqe, fd_in: linux.fd_t, fd_out: linux.fd_t, len: usize) void {
sqe.prep_rw(.tee, fd_out, undefined, len, 0);
sqe.addr = undefined;
sqe.splice_fd_in = fd_in;
}
pub fn prep_read(sqe: *Sqe, fd: linux.fd_t, buffer: []u8, offset: u64) void {
sqe.prep_rw(.read, fd, @intFromPtr(buffer.ptr), buffer.len, offset);
}
pub fn prep_readv(
sqe: *Sqe,
fd: linux.fd_t,
iovecs: []const std.posix.iovec,
offset: u64,
) void {
sqe.prep_rw(.readv, fd, @intFromPtr(iovecs.ptr), iovecs.len, offset);
}
pub fn prep_read_fixed(
sqe: *Sqe,
fd: linux.fd_t,
buffer: []u8,
offset: u64,
buffer_index: u16,
) void {
sqe.prep_rw(.read_fixed, fd, @intFromPtr(buffer.ptr), buffer.len, offset);
sqe.buf_index = buffer_index;
}
pub fn prep_readv_fixed(
sqe: *Sqe,
fd: linux.fd_t,
iovecs: []const std.posix.iovec,
offset: u64,
buffer_index: u16,
) void {
sqe.prep_rw(.read_fixed, fd, @intFromPtr(iovecs.ptr), iovecs.len, offset);
sqe.buf_index = buffer_index;
}
@ -2892,18 +2785,17 @@ pub const BufferGroup = struct {
const heads = try allocator.alloc(u32, buffers_count);
errdefer allocator.free(heads);
const br = try setup_buf_ring(ring.fd, buffers_count, group_id, .{ .iou_pbuf_ring_inc = true });
buf_ring_init(br);
const br = try ring.init_buffer_ring(buffers_count, group_id, .{ .iou_pbuf_ring_inc = true });
const mask = buf_ring_mask(buffers_count);
const mask = br.mask(buffers_count);
var i: u16 = 0;
while (i < buffers_count) : (i += 1) {
const pos = buffer_size * i;
const buf = buffers[pos .. pos + buffer_size];
heads[i] = 0;
buf_ring_add(br, buf, i, mask, i);
br.add(buf, i, mask, i);
}
buf_ring_advance(br, buffers_count);
br.advance(buffers_count);
return .{
.ring = ring,
@ -2917,7 +2809,7 @@ pub const BufferGroup = struct {
}
pub fn deinit(self: *BufferGroup, allocator: mem.Allocator) void {
free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id);
self.br.deinit(self.ring, self.buffers_count, self.group_id);
allocator.free(self.buffers);
allocator.free(self.heads);
}
@ -2978,9 +2870,9 @@ pub const BufferGroup = struct {
self.heads[buffer_id] = 0;
// Release buffer to the kernel.
const mask = buf_ring_mask(self.buffers_count);
buf_ring_add(self.br, self.get_by_id(buffer_id), buffer_id, mask, 0);
buf_ring_advance(self.br, 1);
const mask = self.br.mask(self.buffers_count);
self.br.add(self.get_by_id(buffer_id), buffer_id, mask, 0);
self.br.advance(1);
}
};
@ -3000,6 +2892,14 @@ pub const ReadBuffer = union(enum) {
},
};
/// Used to select how the write should be handled.
pub const WriteBuffer = union(enum) {
/// io_uring will write data from this buffer into fd.
buffer: []const u8,
/// io_uring will write data from iovecs into fd using pwritev.
iovecs: []const posix.iovec_const,
};
/// Used to select how the recv call should be handled.
pub const RecvBuffer = union(enum) {
/// io_uring will recv directly into this buffer
@ -3160,7 +3060,7 @@ pub const Probe = extern struct {
ops: [256]ProbeOp,
/// Is the operation supported on the running kernel.
pub fn is_supported(self: @This(), op: Op) bool {
pub fn is_supported(self: *const Probe, op: Op) bool {
const i = @intFromEnum(op);
if (i > @intFromEnum(self.last_op) or i >= self.ops_len)
return false;
@ -3224,6 +3124,58 @@ pub const BufferRing = extern struct {
resv2: u32,
resv3: u16,
tail: u16,
/// Initialises `br` so that it is ready to be used.
/// matches `io_uring_buf_ring_init` in liburing
fn init(br: *align(page_size_min) BufferRing) void {
br.tail = 0;
}
// Unregisters a previously registered shared buffer ring, returned from
// io_uring_setup_buf_ring.
pub fn deinit(br: *align(page_size_min) BufferRing, uring: *IoUring, entries: u32, group_id: u16) void {
uring.unregister_buffer_ring(group_id) catch {};
var mmap: []align(page_size_min) u8 = undefined;
mmap.ptr = @ptrCast(br);
mmap.len = entries * @sizeOf(Buffer);
posix.munmap(mmap);
}
/// Calculates the appropriate size mask for a buffer ring.
/// `entries` is the ring entries as specified in io_uring_register_buf_ring
pub fn mask(_: *align(page_size_min) BufferRing, entries: u16) u16 {
return entries - 1;
}
/// Assigns `buffer` with the `br` buffer ring.
/// `buffer_id` is identifier which will be returned in the CQE.
/// `buffer_offset` is the offset to insert at from the current tail.
/// If just one buffer is provided before the ring tail is committed with
/// advance then offset should be 0.
/// If buffers are provided in a loop before being committed, the offset must
/// be incremented by one for each buffer added.
pub fn add(
br: *align(page_size_min) BufferRing,
buffer: []u8,
buffer_id: u16,
buffer_mask: u16,
buffer_offset: u16,
) void {
const bufs: [*]Buffer = @ptrCast(br);
const buf: *Buffer = &bufs[(br.tail +% buffer_offset) & buffer_mask];
buf.addr = @intFromPtr(buffer.ptr);
buf.len = @intCast(buffer.len);
buf.bid = buffer_id;
}
/// Make `count` new buffers visible to the kernel. Called after
/// `io_uring_buf_ring_add` has been called `count` times to fill in new
/// buffers.
pub fn advance(br: *align(page_size_min) BufferRing, count: u16) void {
const tail: u16 = br.tail +% count;
@atomicStore(u16, &br.tail, tail, .release);
}
};
/// argument for IORING_(UN)REGISTER_PBUF_RING
@ -4002,7 +3954,7 @@ test "writev/fsync/readv" {
posix.iovec{ .base = &buffer_read, .len = buffer_read.len },
};
const sqe_writev = try ring.writev(0xdddddddd, fd, iovecs_write[0..], 17);
const sqe_writev = try ring.write(0xdddddddd, fd, .{ .iovecs = iovecs_write[0..] }, 17);
try testing.expectEqual(Op.writev, sqe_writev.opcode);
try testing.expectEqual(17, sqe_writev.off);
sqe_writev.link_next();
@ -4064,7 +4016,7 @@ test "write/read" {
const buffer_write = [_]u8{97} ** 20;
var buffer_read = [_]u8{98} ** 20;
const sqe_write = try ring.write(0x11111111, fd, buffer_write[0..], 10);
const sqe_write = try ring.write(0x11111111, fd, .{ .buffer = buffer_write[0..] }, 10);
try testing.expectEqual(Op.write, sqe_write.opcode);
try testing.expectEqual(10, sqe_write.off);
sqe_write.flags.io_link = true;
@ -4198,12 +4150,12 @@ test "write_fixed/read_fixed" {
else => |e| return e,
};
const sqe_write = try ring.write_fixed(0x45454545, fd, &buffers[0], 3, 0);
const sqe_write = try ring.write_fixed(0x45454545, fd, .{ .buffer = raw_buffers[0][0..] }, 3, 0);
try testing.expectEqual(Op.write_fixed, sqe_write.opcode);
try testing.expectEqual(3, sqe_write.off);
sqe_write.link_next();
const sqe_read = try ring.read_fixed(0x12121212, fd, &buffers[1], 0, 1);
const sqe_read = try ring.read_fixed(0x12121212, fd, .{ .buffer = raw_buffers[1][0..] }, 0, 1);
try testing.expectEqual(Op.read_fixed, sqe_read.opcode);
try testing.expectEqual(0, sqe_read.off);