diff --git a/lib/std/os/linux/IoUring.zig b/lib/std/os/linux/IoUring.zig index 49901cd3ac..4ff57be3a6 100644 --- a/lib/std/os/linux/IoUring.zig +++ b/lib/std/os/linux/IoUring.zig @@ -407,51 +407,6 @@ pub fn nop(self: *IoUring, user_data: u64) !*Sqe { return sqe; } -/// Queues (but does not submit) an SQE to perform a `read(2)` or `preadv(2)` -/// depending on the buffer type. -/// * Reading into a `ReadBuffer.buffer` uses `read(2)` -/// * Reading into a `ReadBuffer.iovecs` uses `preadv(2)` -/// -/// If you want to do a `preadv2(2)` then set `rw_flags` on the returned SQE. -/// See https://man7.org/linux/man-pages/man2/preadv2.2.html -/// -/// Returns a pointer to the SQE. -pub fn read( - self: *IoUring, - user_data: u64, - fd: linux.fd_t, - buffer: ReadBuffer, - offset: u64, -) !*Sqe { - const sqe = try self.get_sqe(); - switch (buffer) { - .buffer => |slice| sqe.prep_read(fd, slice, offset), - .iovecs => |vecs| sqe.prep_readv(fd, vecs, offset), - .buffer_selection => |selection| { - sqe.prep_rw(.read, fd, 0, selection.len, offset); - sqe.flags.buffer_select = true; - sqe.buf_index = selection.group_id; - }, - } - sqe.user_data = user_data; - return sqe; -} - -/// Queues (but does not submit) an SQE to perform a `write(2)`. -/// Returns a pointer to the SQE. -pub fn write( - self: *IoUring, - user_data: u64, - fd: linux.fd_t, - buffer: []const u8, - offset: u64, -) !*Sqe { - const sqe = try self.get_sqe(); - sqe.prep_write(fd, buffer, offset); - sqe.user_data = user_data; - return sqe; -} - /// Queues (but does not submit) an SQE to perform a `splice(2)` /// Either `fd_in` or `fd_out` must be a pipe. /// If `fd_in` refers to a pipe, `off_in` is ignored and must be set to @@ -488,6 +443,51 @@ pub fn splice( return sqe; } +// COMMIT: ignored flags for splice and tee lets see if they become important +// in the future +pub fn tee( + self: *IoUring, + user_data: u64, + fd_in: posix.fd_t, + fd_out: posix.fd_t, + len: usize, +) !*Sqe { + const sqe = try self.get_sqe(); + sqe.prep_tee(fd_in, fd_out, len); + sqe.user_data = user_data; + return sqe; +} + +/// Queues (but does not submit) an SQE to perform a `pread(2)` or `preadv(2)` +/// depending on the buffer type. +/// * Reading into a `ReadBuffer.buffer` uses `pread(2)` +/// * Reading into a `ReadBuffer.iovecs` uses `preadv(2)` +/// +/// If you want to do a `preadv2(2)` then set `rw_flags` on the returned SQE. +/// See https://man7.org/linux/man-pages/man2/preadv2.2.html +/// +/// Returns a pointer to the SQE. +pub fn read( + self: *IoUring, + user_data: u64, + fd: posix.fd_t, + buffer: ReadBuffer, + offset: u64, +) !*Sqe { + const sqe = try self.get_sqe(); + switch (buffer) { + .buffer => |slice| sqe.prep_read(fd, slice, offset), + .iovecs => |vecs| sqe.prep_readv(fd, vecs, offset), + .buffer_selection => |selection| { + sqe.prep_rw(.read, fd, 0, selection.len, offset); + sqe.flags.buffer_select = true; + sqe.buf_index = selection.group_id; + }, + } + sqe.user_data = user_data; + return sqe; +} + /// Queues (but does not submit) an SQE to perform a IORING_OP_READ_FIXED. /// The `buffer` provided must be registered with the kernel by calling /// `register_buffers()` first. The `buffer_index` must be the same as its @@ -499,30 +499,45 @@ pub fn read_fixed( self: *IoUring, user_data: u64, fd: linux.fd_t, - buffer: *posix.iovec, + buffer: ReadBuffer, offset: u64, buffer_index: u16, ) !*Sqe { const sqe = try self.get_sqe(); - sqe.prep_read_fixed(fd, buffer, offset, buffer_index); + switch (buffer) { + .buffer => |slice| sqe.prep_read_fixed(fd, slice, offset, buffer_index), + .iovecs => |vecs| sqe.prep_readv_fixed(fd, vecs, offset, buffer_index), + .buffer_selection => |selection| { + sqe.prep_rw(.read_fixed, fd, 0, selection.len, offset); + sqe.flags.buffer_select = true; + sqe.buf_index = selection.group_id; + }, + } sqe.user_data = user_data; return sqe; } -/// Queues (but does not submit) an SQE to perform a `pwritev()`. +/// Queues (but does not submit) an SQE to perform a `pwrite(2)` or `pwritev(2)` +/// depending on the write buffer type. +/// * Reading into a `WriteBuffer.buffer` uses `pwrite(2)` +/// * Reading into a `WriteBuffer.iovecs` uses `pwritev(2)` +/// /// Returns a pointer to the SQE so that you can further modify the SQE for /// advanced use cases. /// For example, if you want to do a `pwritev2()` then set `rw_flags` on the /// returned SQE. See https://linux.die.net/man/2/pwritev. -pub fn writev( +pub fn write( self: *IoUring, user_data: u64, fd: linux.fd_t, - iovecs: []const posix.iovec_const, + buffer: WriteBuffer, offset: u64, ) !*Sqe { const sqe = try self.get_sqe(); - sqe.prep_writev(fd, iovecs, offset); + switch (buffer) { + .buffer => |slice| sqe.prep_write(fd, slice, offset), + .iovecs => |vecs| sqe.prep_writev(fd, vecs, offset), + } sqe.user_data = user_data; return sqe; } @@ -538,12 +553,80 @@ pub fn write_fixed( self: *IoUring, user_data: u64, fd: linux.fd_t, - buffer: *posix.iovec, + buffer: WriteBuffer, offset: u64, buffer_index: u16, ) !*Sqe { const sqe = try self.get_sqe(); - sqe.prep_write_fixed(fd, buffer, offset, buffer_index); + switch (buffer) { + .buffer => |slice| { + sqe.prep_write_fixed(fd, slice, offset, buffer_index); + }, + .iovecs => |vecs| { + sqe.prep_writev_fixed(fd, vecs, offset, buffer_index); + }, + } + sqe.user_data = user_data; + return sqe; +} + +/// Queues (but does not submit) an SQE to perform a `recvmsg(2)`. +/// Returns a pointer to the SQE. +/// Available since 5.3 +pub fn recvmsg( + self: *IoUring, + user_data: u64, + fd: posix.fd_t, + msg: *posix.msghdr, + flags: linux.Msg, +) !*Sqe { + const sqe = try self.get_sqe(); + sqe.prep_recvmsg(fd, msg, flags); + sqe.user_data = user_data; + return sqe; +} + +/// Queues (but does not submit) an SQE to perform a multishot `recvmsg(2)`. +/// Returns a pointer to the SQE. +pub fn recvmsg_multishot( + self: *IoUring, + user_data: u64, + fd: posix.fd_t, + msg: *posix.msghdr, + flags: linux.Msg, +) !*Sqe { + const sqe = try self.get_sqe(); + sqe.prep_recvmsg_multishot(fd, msg, flags); + sqe.user_data = user_data; + return sqe; +} + +/// Queues (but does not submit) an SQE to perform a `sendmsg(2)`. +/// Returns a pointer to the SQE. +/// Available since 5.3 +pub fn sendmsg( + self: *IoUring, + user_data: u64, + fd: posix.fd_t, + msg: *const posix.msghdr_const, + flags: linux.Msg, +) !*Sqe { + const sqe = try self.get_sqe(); + sqe.prep_sendmsg(fd, msg, flags); + sqe.user_data = user_data; + return sqe; +} + +/// Queues (but does not submit) an SQE to perform a `poll(2)`. +/// Returns a pointer to the SQE. +pub fn poll_add( + self: *IoUring, + user_data: u64, + fd: posix.fd_t, + poll_mask: linux.Epoll, +) !*Sqe { + const sqe = try self.get_sqe(); + sqe.prep_poll_add(fd, poll_mask); sqe.user_data = user_data; return sqe; } @@ -746,38 +829,6 @@ pub fn send_zc_fixed( return sqe; } -/// Queues (but does not submit) an SQE to perform a `recvmsg(2)`. -/// Returns a pointer to the SQE. -/// Available since 5.3 -pub fn recvmsg( - self: *IoUring, - user_data: u64, - fd: linux.fd_t, - msg: *linux.msghdr, - flags: linux.Msg, -) !*Sqe { - const sqe = try self.get_sqe(); - sqe.prep_recvmsg(fd, msg, flags); - sqe.user_data = user_data; - return sqe; -} - -/// Queues (but does not submit) an SQE to perform a `sendmsg(2)`. -/// Returns a pointer to the SQE. -/// Available since 5.3 -pub fn sendmsg( - self: *IoUring, - user_data: u64, - fd: linux.fd_t, - msg: *const linux.msghdr_const, - flags: linux.Msg, -) !*Sqe { - const sqe = try self.get_sqe(); - sqe.prep_sendmsg(fd, msg, flags); - sqe.user_data = user_data; - return sqe; -} - /// Queues (but does not submit) an SQE to perform an async zerocopy `sendmsg(2)`. /// Returns a pointer to the SQE. /// Available since 6.1 @@ -931,20 +982,6 @@ pub fn link_timeout( return sqe; } -/// Queues (but does not submit) an SQE to perform a `poll(2)`. -/// Returns a pointer to the SQE. -pub fn poll_add( - self: *IoUring, - user_data: u64, - fd: linux.fd_t, - poll_mask: linux.Epoll, -) !*Sqe { - const sqe = try self.get_sqe(); - sqe.prep_poll_add(fd, poll_mask); - sqe.user_data = user_data; - return sqe; -} - /// Queues (but does not submit) an SQE to remove an existing poll operation. /// Returns a pointer to the SQE. pub fn poll_remove( @@ -1186,25 +1223,16 @@ pub fn register_buffers_sparse(self: *IoUring, nr: u32) !void { .nr = nr, }; - const res = linux.io_uring_register( - self.fd, - .register_buffers2, - ®, - @sizeOf(RsrcRegister), - ); + const res = linux.io_uring_register(self.fd, .register_buffers2, ®, @sizeOf(RsrcRegister)); try handle_registration_result(res); } -/// Registers an array of buffers for use with `read_fixed` and `write_fixed`. +/// Registers an array of buffers for use with `read_fixed`, `readv_fixed`, +/// `write_fixed` and `writev_fixed`. pub fn register_buffers(self: *IoUring, buffers: []const posix.iovec) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - self.fd, - .register_buffers, - buffers.ptr, - @intCast(buffers.len), - ); + const res = linux.io_uring_register(self.fd, .register_buffers, buffers.ptr, @intCast(buffers.len)); try handle_registration_result(res); } @@ -1237,12 +1265,7 @@ pub fn register_files_update(self: *IoUring, offset: u32, fds: []const posix.fd_ .data = @intFromPtr(fds.ptr), }); - const res = linux.io_uring_register( - self.fd, - .register_files_update, - &update, - @intCast(fds.len), - ); + const res = linux.io_uring_register(self.fd, .register_files_update, &update, @intCast(fds.len)); try handle_registration_result(res); } @@ -1255,12 +1278,7 @@ pub fn register_files_sparse(self: *IoUring, nr_files: u32) !void { .flags = .{ .register_sparse = true }, }); - const res = linux.io_uring_register( - self.fd, - .register_files2, - ®, - @sizeOf(RsrcRegister), - ); + const res = linux.io_uring_register(self.fd, .register_files2, ®, @sizeOf(RsrcRegister)); return handle_registration_result(res); } @@ -1283,12 +1301,7 @@ pub fn register_files_sparse(self: *IoUring, nr_files: u32) !void { /// file descriptors. pub fn register_files(self: *IoUring, fds: []const linux.fd_t) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - self.fd, - .register_files, - fds.ptr, - @intCast(fds.len), - ); + const res = linux.io_uring_register(self.fd, .register_files, fds.ptr, @intCast(fds.len)); try handle_registration_result(res); } @@ -1309,12 +1322,7 @@ pub fn unregister_files(self: *IoUring) !void { /// Only a single a eventfd can be registered at any given point in time. pub fn register_eventfd(self: *IoUring, fd: linux.fd_t) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - self.fd, - .register_eventfd, - &fd, - 1, - ); + const res = linux.io_uring_register(self.fd, .register_eventfd, &fd, 1); try handle_registration_result(res); } @@ -1325,35 +1333,20 @@ pub fn register_eventfd(self: *IoUring, fd: linux.fd_t) !void { /// Only a single eventfd can be registered at any given point in time. pub fn register_eventfd_async(self: *IoUring, fd: linux.fd_t) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - self.fd, - .register_eventfd_async, - &fd, - 1, - ); + const res = linux.io_uring_register(self.fd, .register_eventfd_async, &fd, 1); try handle_registration_result(res); } /// Unregister the registered eventfd file descriptor. pub fn unregister_eventfd(self: *IoUring) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - self.fd, - .unregister_eventfd, - null, - 0, - ); + const res = linux.io_uring_register(self.fd, .unregister_eventfd, null, 0); try handle_registration_result(res); } pub fn register_probe(self: *IoUring, probe: []Probe) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - self.fd, - .register_probe, - probe.ptr, - @intCast(probe.len), - ); + const res = linux.io_uring_register(self.fd, .register_probe, probe.ptr, @intCast(probe.len)); try handle_registration_result(res); } @@ -1361,12 +1354,7 @@ pub fn register_probe(self: *IoUring, probe: []Probe) !void { /// matches `io_uring_register_personality()` in liburing pub fn register_personality(self: *IoUring) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - self.fd, - .register_personality, - null, - 0, - ); + const res = linux.io_uring_register(self.fd, .register_personality, null, 0); try handle_registration_result(res); } @@ -1378,23 +1366,13 @@ pub fn unregister_personality(self: *IoUring, credential_id: u32) !void { pub fn register_restrictions(self: *IoUring, restriction: []Restriction) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - self.fd, - .register_restrictions, - restriction.ptr, - @intCast(restriction.len), - ); + const res = linux.io_uring_register(self.fd, .register_restrictions, restriction.ptr, @intCast(restriction.len)); try handle_registration_result(res); } pub fn enable_rings(self: *IoUring) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - self.fd, - .register_enable_rings, - null, - 0, - ); + const res = linux.io_uring_register(self.fd, .register_enable_rings, null, 0); try handle_registration_result(res); } @@ -1403,24 +1381,14 @@ pub fn register_iowq_aff(self: *IoUring, cpusz: u32, mask: *linux.cpu_set_t) !vo if (cpusz >= math.maxInt(u32)) return error.ArgumentsInvalid; - const res = linux.io_uring_register( - self.fd, - .register_iowq_aff, - mask, - cpusz, - ); + const res = linux.io_uring_register(self.fd, .register_iowq_aff, mask, cpusz); try handle_registration_result(res); } pub fn unregister_iowq_aff(self: *IoUring) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - self.fd, - .unregister_iowq_aff, - null, - 0, - ); + const res = linux.io_uring_register(self.fd, .unregister_iowq_aff, null, 0); try handle_registration_result(res); } @@ -1432,12 +1400,7 @@ pub fn unregister_iowq_aff(self: *IoUring) !void { pub fn register_iowq_max_workers(self: *IoUring, max_workers: [2]u32) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - self.fd, - .register_iowq_max_workers, - &max_workers, - 2, - ); + const res = linux.io_uring_register(self.fd, .register_iowq_max_workers, &max_workers, 2); try handle_registration_result(res); } @@ -1445,12 +1408,7 @@ pub fn register_iowq_max_workers(self: *IoUring, max_workers: [2]u32) !void { pub fn register_sync_cancel(self: *IoUring, cancel_reg: *SyncCancelRegister) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - self.fd, - .register_sync_cancel, - cancel_reg, - 1, - ); + const res = linux.io_uring_register(self.fd, .register_sync_cancel, cancel_reg, 1); try handle_registration_result(res); } @@ -1458,12 +1416,7 @@ pub fn register_sync_cancel(self: *IoUring, cancel_reg: *SyncCancelRegister) !vo pub fn register_sync_msg(self: *IoUring, sqe: *Sqe) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - -1, - .register_send_msg_ring, - sqe, - 1, - ); + const res = linux.io_uring_register(-1, .register_send_msg_ring, sqe, 1); try handle_registration_result(res); } @@ -1479,57 +1432,32 @@ pub fn register_file_alloc_range(self: *IoUring, offset: u32, len: u32) !void { .resv = 0, }; - const res = linux.io_uring_register( - self.fd, - .register_file_alloc_range, - &range, - 0, - ); + const res = linux.io_uring_register(self.fd, .register_file_alloc_range, &range, 0); return handle_registration_result(res); } pub fn register_napi(self: *IoUring, napi: *Napi) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - self.fd, - .register_napi, - napi, - 1, - ); + const res = linux.io_uring_register(self.fd, .register_napi, napi, 1); try handle_registration_result(res); } pub fn unregister_napi(self: *IoUring, napi: *Napi) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - self.fd, - .unregister_napi, - napi, - 1, - ); + const res = linux.io_uring_register(self.fd, .unregister_napi, napi, 1); try handle_registration_result(res); } pub fn register_clock(self: *IoUring, clock_reg: *ClockRegister) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - self.fd, - .register_clock, - clock_reg, - 0, - ); + const res = linux.io_uring_register(self.fd, .register_clock, clock_reg, 0); try handle_registration_result(res); } pub fn register_ifq(self: *IoUring, ifq_reg: *ZcrxIfqRegister) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - self.fd, - .register_zcrx_ifq, - ifq_reg, - 1, - ); + const res = linux.io_uring_register(self.fd, .register_zcrx_ifq, ifq_reg, 1); try handle_registration_result(res); } @@ -1540,12 +1468,7 @@ pub fn register_resize_rings(self: *IoUring, _: *Params) !void { pub fn register_region(self: *IoUring, mem_reg: *MemRegionRegister) !void { assert(self.fd >= 0); - const res = linux.io_uring_register( - self.fd, - .register_mem_region, - mem_reg, - 1, - ); + const res = linux.io_uring_register(self.fd, .register_mem_region, mem_reg, 1); try handle_registration_result(res); } @@ -1747,12 +1670,14 @@ pub fn getsockopt( /// a power of 2. /// `fd` is IO_Uring.fd for which the provided buffer ring is being registered. /// `group_id` is the chosen buffer group ID, unique in IO_Uring. -pub fn setup_buf_ring( - fd: linux.fd_t, +/// matches `io_uring_setup_buf_ring()` in liburing +pub fn init_buffer_ring( + self: *IoUring, entries: u16, group_id: u16, flags: BufferRegister.Flags, ) !*align(page_size_min) BufferRing { + assert(self.fd >= 0); if (entries == 0 or entries > math.maxInt(u16)) return error.EntriesNotInRange; if (!math.isPowerOfTwo(entries)) return error.EntriesNotPowerOfTwo; @@ -1768,51 +1693,36 @@ pub fn setup_buf_ring( errdefer posix.munmap(mmap); assert(mmap.len == mmap_size); - const br_addr: *align(page_size_min) BufferRing = @ptrCast(mmap.ptr); + const buffer_ring: *align(page_size_min) BufferRing = @ptrCast(mmap.ptr); var reg = mem.zeroInit(BufferRegister, .{ - .ring_addr = @intFromPtr(br_addr), + .ring_addr = @intFromPtr(buffer_ring), .ring_entries = entries, .bgid = group_id, .flags = flags, }); - try register_buf_ring(fd, ®); - return br_addr; + try self.register_buffer_ring(®); + buffer_ring.init(); + return buffer_ring; } -pub fn register_buf_ring( - fd: posix.fd_t, - buf_reg: *BufferRegister, -) !void { - var res = linux.io_uring_register( - fd, - .register_pbuf_ring, - buf_reg, - 1, - ); +/// matches `io_uring_register_buf_ring` +pub fn register_buffer_ring(self: *IoUring, buf_reg: *BufferRegister) !void { + var res = linux.io_uring_register(self.fd, .register_pbuf_ring, buf_reg, 1); if (linux.E.init(res) == .INVAL and buf_reg.flags.iou_pbuf_ring_inc) { // Retry without incremental buffer consumption. // It is available since kernel 6.12. returns INVAL on older. buf_reg.flags.iou_pbuf_ring_inc = false; - res = linux.io_uring_register( - fd, - .register_pbuf_ring, - buf_reg, - 1, - ); + res = linux.io_uring_register(self.fd, .register_pbuf_ring, buf_reg, 1); } try handle_register_buf_ring_result(res); } -pub fn unregister_buf_ring(fd: posix.fd_t, buf_group_id: u16) !void { +/// matches `io_uring_unregister_buf_ring` +pub fn unregister_buffer_ring(self: *IoUring, buf_group_id: u16) !void { var reg = mem.zeroInit(BufferRegister, .{ .bgid = buf_group_id, }); - const res = linux.io_uring_register( - fd, - .unregister_pbuf_ring, - ®, - 1, - ); + const res = linux.io_uring_register(self.fd, .unregister_pbuf_ring, ®, 1); try handle_register_buf_ring_result(res); } @@ -1824,57 +1734,6 @@ fn handle_register_buf_ring_result(res: usize) !void { } } -// Unregisters a previously registered shared buffer ring, returned from -// io_uring_setup_buf_ring. -pub fn free_buf_ring(fd: posix.fd_t, br: *align(page_size_min) BufferRing, entries: u32, group_id: u16) void { - unregister_buf_ring(fd, group_id) catch {}; - var mmap: []align(page_size_min) u8 = undefined; - mmap.ptr = @ptrCast(br); - mmap.len = entries * @sizeOf(Buffer); - posix.munmap(mmap); -} - -/// Initialises `br` so that it is ready to be used. -pub fn buf_ring_init(br: *BufferRing) void { - br.tail = 0; -} - -/// Calculates the appropriate size mask for a buffer ring. -/// `entries` is the ring entries as specified in io_uring_register_buf_ring. -pub fn buf_ring_mask(entries: u16) u16 { - return entries - 1; -} - -/// Assigns `buffer` with the `br` buffer ring. -/// `buffer_id` is identifier which will be returned in the CQE. -/// `buffer_offset` is the offset to insert at from the current tail. -/// If just one buffer is provided before the ring tail is committed with -/// advance then offset should be 0. -/// If buffers are provided in a loop before being committed, the offset must -/// be incremented by one for each buffer added. -pub fn buf_ring_add( - br: *BufferRing, - buffer: []u8, - buffer_id: u16, - mask: u16, - buffer_offset: u16, -) void { - const bufs: [*]Buffer = @ptrCast(br); - const buf: *Buffer = &bufs[(br.tail +% buffer_offset) & mask]; - - buf.addr = @intFromPtr(buffer.ptr); - buf.len = @intCast(buffer.len); - buf.bid = buffer_id; -} - -/// Make `count` new buffers visible to the kernel. Called after -/// `io_uring_buf_ring_add` has been called `count` times to fill in new -/// buffers. -pub fn buf_ring_advance(br: *BufferRing, count: u16) void { - const tail: u16 = br.tail +% count; - @atomicStore(u16, &br.tail, tail, .release); -} - /// IO completion data structure (Completion Queue Entry) pub const Cqe = extern struct { /// sqe.user_data value passed back @@ -1922,6 +1781,12 @@ pub const Cqe = extern struct { _17: u16 = 0, }; + /// Retrive the 64-bit cqe `user_data`, as `*T` after completion of an Sqe + /// this data is passed through `Sqe` -> `Cqe` unchanged + pub fn get_data(cqe: Cqe, comptime T: type) *T { + return @ptrFromInt(cqe.user_data); + } + pub fn err(self: Cqe) linux.E { if (self.res > -4096 and self.res < 0) { return @enumFromInt(-self.res); @@ -2108,29 +1973,10 @@ pub const Sqe = extern struct { }; } - pub fn prep_read(sqe: *Sqe, fd: linux.fd_t, buffer: []u8, offset: u64) void { - sqe.prep_rw(.read, fd, @intFromPtr(buffer.ptr), buffer.len, offset); - } - pub fn prep_write(sqe: *Sqe, fd: linux.fd_t, buffer: []const u8, offset: u64) void { sqe.prep_rw(.write, fd, @intFromPtr(buffer.ptr), buffer.len, offset); } - pub fn prep_splice(sqe: *Sqe, fd_in: linux.fd_t, off_in: u64, fd_out: linux.fd_t, off_out: u64, len: usize) void { - sqe.prep_rw(.splice, fd_out, undefined, len, off_out); - sqe.addr = off_in; - sqe.splice_fd_in = fd_in; - } - - pub fn prep_readv( - sqe: *Sqe, - fd: linux.fd_t, - iovecs: []const std.posix.iovec, - offset: u64, - ) void { - sqe.prep_rw(.readv, fd, @intFromPtr(iovecs.ptr), iovecs.len, offset); - } - pub fn prep_writev( sqe: *Sqe, fd: linux.fd_t, @@ -2140,13 +1986,60 @@ pub const Sqe = extern struct { sqe.prep_rw(.writev, fd, @intFromPtr(iovecs.ptr), iovecs.len, offset); } - pub fn prep_read_fixed(sqe: *Sqe, fd: linux.fd_t, buffer: *std.posix.iovec, offset: u64, buffer_index: u16) void { - sqe.prep_rw(.read_fixed, fd, @intFromPtr(buffer.base), buffer.len, offset); + pub fn prep_write_fixed(sqe: *Sqe, fd: linux.fd_t, buffer: []const u8, offset: u64, buffer_index: u16) void { + sqe.prep_rw(.write_fixed, fd, @intFromPtr(buffer.ptr), buffer.len, offset); sqe.buf_index = buffer_index; } - pub fn prep_write_fixed(sqe: *Sqe, fd: linux.fd_t, buffer: *std.posix.iovec, offset: u64, buffer_index: u16) void { - sqe.prep_rw(.write_fixed, fd, @intFromPtr(buffer.base), buffer.len, offset); + pub fn prep_writev_fixed(sqe: *Sqe, fd: linux.fd_t, iovecs: []const posix.iovec_const, offset: u64, buffer_index: u16) void { + sqe.prep_rw(.write_fixed, fd, @intFromPtr(iovecs.ptr), iovecs.len, offset); + sqe.buf_index = buffer_index; + } + + pub fn prep_splice(sqe: *Sqe, fd_in: linux.fd_t, off_in: u64, fd_out: linux.fd_t, off_out: u64, len: usize) void { + sqe.prep_rw(.splice, fd_out, undefined, len, off_out); + sqe.addr = off_in; + sqe.splice_fd_in = fd_in; + } + + pub fn prep_tee(sqe: *Sqe, fd_in: linux.fd_t, fd_out: linux.fd_t, len: usize) void { + sqe.prep_rw(.tee, fd_out, undefined, len, 0); + sqe.addr = undefined; + sqe.splice_fd_in = fd_in; + } + + pub fn prep_read(sqe: *Sqe, fd: linux.fd_t, buffer: []u8, offset: u64) void { + sqe.prep_rw(.read, fd, @intFromPtr(buffer.ptr), buffer.len, offset); + } + + pub fn prep_readv( + sqe: *Sqe, + fd: linux.fd_t, + iovecs: []const std.posix.iovec, + offset: u64, + ) void { + sqe.prep_rw(.readv, fd, @intFromPtr(iovecs.ptr), iovecs.len, offset); + } + + pub fn prep_read_fixed( + sqe: *Sqe, + fd: linux.fd_t, + buffer: []u8, + offset: u64, + buffer_index: u16, + ) void { + sqe.prep_rw(.read_fixed, fd, @intFromPtr(buffer.ptr), buffer.len, offset); + sqe.buf_index = buffer_index; + } + + pub fn prep_readv_fixed( + sqe: *Sqe, + fd: linux.fd_t, + iovecs: []const std.posix.iovec, + offset: u64, + buffer_index: u16, + ) void { + sqe.prep_rw(.read_fixed, fd, @intFromPtr(iovecs.ptr), iovecs.len, offset); sqe.buf_index = buffer_index; } @@ -2892,18 +2785,17 @@ pub const BufferGroup = struct { const heads = try allocator.alloc(u32, buffers_count); errdefer allocator.free(heads); - const br = try setup_buf_ring(ring.fd, buffers_count, group_id, .{ .iou_pbuf_ring_inc = true }); - buf_ring_init(br); + const br = try ring.init_buffer_ring(buffers_count, group_id, .{ .iou_pbuf_ring_inc = true }); - const mask = buf_ring_mask(buffers_count); + const mask = br.mask(buffers_count); var i: u16 = 0; while (i < buffers_count) : (i += 1) { const pos = buffer_size * i; const buf = buffers[pos .. pos + buffer_size]; heads[i] = 0; - buf_ring_add(br, buf, i, mask, i); + br.add(buf, i, mask, i); } - buf_ring_advance(br, buffers_count); + br.advance(buffers_count); return .{ .ring = ring, @@ -2917,7 +2809,7 @@ pub const BufferGroup = struct { } pub fn deinit(self: *BufferGroup, allocator: mem.Allocator) void { - free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id); + self.br.deinit(self.ring, self.buffers_count, self.group_id); allocator.free(self.buffers); allocator.free(self.heads); } @@ -2978,9 +2870,9 @@ pub const BufferGroup = struct { self.heads[buffer_id] = 0; // Release buffer to the kernel. - const mask = buf_ring_mask(self.buffers_count); - buf_ring_add(self.br, self.get_by_id(buffer_id), buffer_id, mask, 0); - buf_ring_advance(self.br, 1); + const mask = self.br.mask(self.buffers_count); + self.br.add(self.get_by_id(buffer_id), buffer_id, mask, 0); + self.br.advance(1); } }; @@ -3000,6 +2892,14 @@ pub const ReadBuffer = union(enum) { }, }; +/// Used to select how the write should be handled. +pub const WriteBuffer = union(enum) { + /// io_uring will write data from this buffer into fd. + buffer: []const u8, + /// io_uring will write data from iovecs into fd using pwritev. + iovecs: []const posix.iovec_const, +}; + /// Used to select how the recv call should be handled. pub const RecvBuffer = union(enum) { /// io_uring will recv directly into this buffer @@ -3160,7 +3060,7 @@ pub const Probe = extern struct { ops: [256]ProbeOp, /// Is the operation supported on the running kernel. - pub fn is_supported(self: @This(), op: Op) bool { + pub fn is_supported(self: *const Probe, op: Op) bool { const i = @intFromEnum(op); if (i > @intFromEnum(self.last_op) or i >= self.ops_len) return false; @@ -3224,6 +3124,58 @@ pub const BufferRing = extern struct { resv2: u32, resv3: u16, tail: u16, + + /// Initialises `br` so that it is ready to be used. + /// matches `io_uring_buf_ring_init` in liburing + fn init(br: *align(page_size_min) BufferRing) void { + br.tail = 0; + } + + // Unregisters a previously registered shared buffer ring, returned from + // io_uring_setup_buf_ring. + pub fn deinit(br: *align(page_size_min) BufferRing, uring: *IoUring, entries: u32, group_id: u16) void { + uring.unregister_buffer_ring(group_id) catch {}; + var mmap: []align(page_size_min) u8 = undefined; + mmap.ptr = @ptrCast(br); + mmap.len = entries * @sizeOf(Buffer); + posix.munmap(mmap); + } + + /// Calculates the appropriate size mask for a buffer ring. + /// `entries` is the ring entries as specified in io_uring_register_buf_ring + pub fn mask(_: *align(page_size_min) BufferRing, entries: u16) u16 { + return entries - 1; + } + + /// Assigns `buffer` with the `br` buffer ring. + /// `buffer_id` is identifier which will be returned in the CQE. + /// `buffer_offset` is the offset to insert at from the current tail. + /// If just one buffer is provided before the ring tail is committed with + /// advance then offset should be 0. + /// If buffers are provided in a loop before being committed, the offset must + /// be incremented by one for each buffer added. + pub fn add( + br: *align(page_size_min) BufferRing, + buffer: []u8, + buffer_id: u16, + buffer_mask: u16, + buffer_offset: u16, + ) void { + const bufs: [*]Buffer = @ptrCast(br); + const buf: *Buffer = &bufs[(br.tail +% buffer_offset) & buffer_mask]; + + buf.addr = @intFromPtr(buffer.ptr); + buf.len = @intCast(buffer.len); + buf.bid = buffer_id; + } + + /// Make `count` new buffers visible to the kernel. Called after + /// `io_uring_buf_ring_add` has been called `count` times to fill in new + /// buffers. + pub fn advance(br: *align(page_size_min) BufferRing, count: u16) void { + const tail: u16 = br.tail +% count; + @atomicStore(u16, &br.tail, tail, .release); + } }; /// argument for IORING_(UN)REGISTER_PBUF_RING @@ -4002,7 +3954,7 @@ test "writev/fsync/readv" { posix.iovec{ .base = &buffer_read, .len = buffer_read.len }, }; - const sqe_writev = try ring.writev(0xdddddddd, fd, iovecs_write[0..], 17); + const sqe_writev = try ring.write(0xdddddddd, fd, .{ .iovecs = iovecs_write[0..] }, 17); try testing.expectEqual(Op.writev, sqe_writev.opcode); try testing.expectEqual(17, sqe_writev.off); sqe_writev.link_next(); @@ -4064,7 +4016,7 @@ test "write/read" { const buffer_write = [_]u8{97} ** 20; var buffer_read = [_]u8{98} ** 20; - const sqe_write = try ring.write(0x11111111, fd, buffer_write[0..], 10); + const sqe_write = try ring.write(0x11111111, fd, .{ .buffer = buffer_write[0..] }, 10); try testing.expectEqual(Op.write, sqe_write.opcode); try testing.expectEqual(10, sqe_write.off); sqe_write.flags.io_link = true; @@ -4198,12 +4150,12 @@ test "write_fixed/read_fixed" { else => |e| return e, }; - const sqe_write = try ring.write_fixed(0x45454545, fd, &buffers[0], 3, 0); + const sqe_write = try ring.write_fixed(0x45454545, fd, .{ .buffer = raw_buffers[0][0..] }, 3, 0); try testing.expectEqual(Op.write_fixed, sqe_write.opcode); try testing.expectEqual(3, sqe_write.off); sqe_write.link_next(); - const sqe_read = try ring.read_fixed(0x12121212, fd, &buffers[1], 0, 1); + const sqe_read = try ring.read_fixed(0x12121212, fd, .{ .buffer = raw_buffers[1][0..] }, 0, 1); try testing.expectEqual(Op.read_fixed, sqe_read.opcode); try testing.expectEqual(0, sqe_read.off);