diff --git a/lib/std/Io.zig b/lib/std/Io.zig index ddfb8c2e01..4ae1e25c04 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -665,14 +665,14 @@ pub const VTable = struct { fileSeekBy: *const fn (?*anyopaque, file: File, offset: i64) File.SeekError!void, fileSeekTo: *const fn (?*anyopaque, file: File, offset: u64) File.SeekError!void, - now: *const fn (?*anyopaque, clockid: std.posix.clockid_t) NowError!Timestamp, - sleep: *const fn (?*anyopaque, clockid: std.posix.clockid_t, timeout: Timeout) SleepError!void, + now: *const fn (?*anyopaque, Timestamp.Clock) Timestamp.Error!i96, + sleep: *const fn (?*anyopaque, Timeout) SleepError!void, listen: *const fn (?*anyopaque, address: net.IpAddress, options: net.IpAddress.ListenOptions) net.IpAddress.ListenError!net.Server, accept: *const fn (?*anyopaque, server: *net.Server) net.Server.AcceptError!net.Stream, ipBind: *const fn (?*anyopaque, address: net.IpAddress, options: net.IpAddress.BindOptions) net.IpAddress.BindError!net.Socket, - netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) net.Socket.SendError!void, - netReceive: *const fn (?*anyopaque, handle: net.Socket.Handle, buffer: []u8, timeout: Timeout) net.Socket.ReceiveTimeoutError!net.ReceivedMessage, + netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) net.SendResult, + netReceive: *const fn (?*anyopaque, net.Socket.Handle, message_buffer: []net.IncomingMessage, data_buffer: []u8, net.ReceiveFlags, Timeout) struct { ?net.Socket.ReceiveTimeoutError, usize }, netRead: *const fn (?*anyopaque, src: net.Stream, data: [][]u8) net.Stream.Reader.Error!usize, netWrite: *const fn (?*anyopaque, dest: net.Stream, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize, netClose: *const fn (?*anyopaque, handle: net.Socket.Handle) void, @@ -700,46 +700,135 @@ pub const UnexpectedError = error{ pub const Dir = @import("Io/Dir.zig"); pub const File = @import("Io/File.zig"); -pub const Timestamp = enum(i96) { - _, +pub const Timestamp = struct { + nanoseconds: i96, + clock: Clock, + + pub const Clock = enum { + /// A settable system-wide clock that measures real (i.e. wall-clock) + /// time. This clock is affected by discontinuous jumps in the system + /// time (e.g., if the system administrator manually changes the + /// clock), and by frequency adjust‐ ments performed by NTP and similar + /// applications. + /// This clock normally counts the number of seconds since + /// 1970-01-01 00:00:00 Coordinated Universal Time (UTC) except that it + /// ignores leap seconds; near a leap second it is typically + /// adjusted by NTP to stay roughly in sync with UTC. + realtime, + /// A nonsettable system-wide clock that represents time since some + /// unspecified point in the past. + /// + /// On Linux, corresponds to how long the system has been running since + /// it booted. + /// + /// Not affected by discontinuous jumps in the system time (e.g., if + /// the system administrator manually changes the clock), but is + /// affected by frequency adjustments. **This clock does not count time + /// that the system is suspended.** + /// + /// Guarantees that the time returned by consecutive calls will not go + /// backwards, but successive calls may return identical + /// (not-increased) time values. + monotonic, + /// Identical to `monotonic` except it also includes any time that the + /// system is suspended. + boottime, + }; pub fn durationTo(from: Timestamp, to: Timestamp) Duration { - return .{ .nanoseconds = @intFromEnum(to) - @intFromEnum(from) }; + assert(from.clock == to.clock); + return .{ .nanoseconds = to.nanoseconds - from.nanoseconds }; } pub fn addDuration(from: Timestamp, duration: Duration) Timestamp { - return @enumFromInt(@intFromEnum(from) + duration.nanoseconds); + return .{ + .nanoseconds = from.nanoseconds + duration.nanoseconds, + .clock = from.clock, + }; } - pub fn fromNow(io: Io, clockid: std.posix.clockid_t, duration: Duration) NowError!Timestamp { - const now_ts = try now(io, clockid); + pub const Error = error{UnsupportedClock} || UnexpectedError; + + /// This function is not cancelable because first of all it does not block, + /// but more importantly, the cancelation logic itself may want to check + /// the time. + pub fn now(io: Io, clock: Clock) Error!Timestamp { + return .{ + .nanoseconds = try io.vtable.now(io.userdata, clock), + .clock = clock, + }; + } + + pub fn fromNow(io: Io, clock: Clock, duration: Duration) Error!Timestamp { + const now_ts = try now(io, clock); return addDuration(now_ts, duration); } + pub fn untilNow(timestamp: Timestamp, io: Io) Error!Duration { + const now_ts = try Timestamp.now(io, timestamp.clock); + return timestamp.durationTo(now_ts); + } + + pub fn durationFromNow(timestamp: Timestamp, io: Io) Error!Duration { + const now_ts = try now(io, timestamp.clock); + return now_ts.durationTo(timestamp); + } + + pub fn toClock(t: Timestamp, io: Io, clock: Clock) Error!Timestamp { + if (t.clock == clock) return t; + const now_old = try now(io, t.clock); + const now_new = try now(io, clock); + const duration = now_old.durationTo(t); + return now_new.addDuration(duration); + } + pub fn compare(lhs: Timestamp, op: std.math.CompareOperator, rhs: Timestamp) bool { - return std.math.compare(@intFromEnum(lhs), op, @intFromEnum(rhs)); + assert(lhs.clock == rhs.clock); + return std.math.compare(lhs.nanoseconds, op, rhs.nanoseconds); } }; + pub const Duration = struct { nanoseconds: i96, - pub fn ms(x: u64) Duration { + pub fn fromMilliseconds(x: i64) Duration { return .{ .nanoseconds = @as(i96, x) * std.time.ns_per_ms }; } - pub fn seconds(x: u64) Duration { + pub fn fromSeconds(x: i64) Duration { return .{ .nanoseconds = @as(i96, x) * std.time.ns_per_s }; } + + pub fn toMilliseconds(d: Duration) i64 { + return @intCast(@divTrunc(d.nanoseconds, std.time.ns_per_ms)); + } + + pub fn toSeconds(d: Duration) i64 { + return @intCast(@divTrunc(d.nanoseconds, std.time.ns_per_s)); + } }; + +/// Declares under what conditions an operation should return `error.Timeout`. pub const Timeout = union(enum) { none, - duration: Duration, + duration: ClockAndDuration, deadline: Timestamp, - pub const Error = error{Timeout}; + pub const Error = error{ Timeout, UnsupportedClock }; + + pub const ClockAndDuration = struct { + clock: Timestamp.Clock, + duration: Duration, + }; + + pub fn toDeadline(t: Timeout, io: Io) Timestamp.Error!?Timestamp { + return switch (t) { + .none => null, + .duration => |d| try .fromNow(io, d.clock, d.duration), + .deadline => |d| d, + }; + } }; -pub const NowError = std.posix.ClockGetTimeError || Cancelable; -pub const SleepError = error{ UnsupportedClock, Unexpected, Canceled }; pub const AnyFuture = opaque {}; @@ -1231,12 +1320,10 @@ pub fn cancelRequested(io: Io) bool { return io.vtable.cancelRequested(io.userdata); } -pub fn now(io: Io, clockid: std.posix.clockid_t) NowError!Timestamp { - return io.vtable.now(io.userdata, clockid); -} +pub const SleepError = error{UnsupportedClock} || UnexpectedError || Cancelable; -pub fn sleep(io: Io, clockid: std.posix.clockid_t, timeout: Timeout) SleepError!void { - return io.vtable.sleep(io.userdata, clockid, timeout); +pub fn sleep(io: Io, timeout: Timeout) SleepError!void { + return io.vtable.sleep(io.userdata, timeout); } pub fn sleepDuration(io: Io, duration: Duration) SleepError!void { diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index 90f5dbdb22..d1d7799907 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -1406,7 +1406,7 @@ fn pread(userdata: ?*anyopaque, file: Io.File, buffer: []u8, offset: std.posix.o .ISDIR => return error.IsDir, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.ConnectionTimedOut, .NXIO => return error.Unseekable, diff --git a/lib/std/Io/File.zig b/lib/std/Io/File.zig index 63381fe99c..abd74bdd98 100644 --- a/lib/std/Io/File.zig +++ b/lib/std/Io/File.zig @@ -157,7 +157,7 @@ pub const ReadStreamingError = error{ ConnectionResetByPeer, ConnectionTimedOut, NotOpenForReading, - SocketNotConnected, + SocketUnconnected, /// This error occurs when no global event loop is configured, /// and reading from the file descriptor would block. WouldBlock, diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 3b72f1ede1..f61a75f9ba 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -811,7 +811,7 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File .ISDIR => return error.IsDir, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.ConnectionTimedOut, .NOTCAPABLE => return error.AccessDenied, @@ -834,7 +834,7 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File .ISDIR => return error.IsDir, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.ConnectionTimedOut, else => |err| return posix.unexpectedErrno(err), @@ -933,7 +933,7 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset .ISDIR => return error.IsDir, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.ConnectionTimedOut, .NXIO => return error.Unseekable, @@ -960,7 +960,7 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset .ISDIR => return error.IsDir, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.ConnectionTimedOut, .NXIO => return error.Unseekable, @@ -999,19 +999,29 @@ fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: posi }; } -fn now(userdata: ?*anyopaque, clockid: posix.clockid_t) Io.NowError!Io.Timestamp { +fn now(userdata: ?*anyopaque, clock: Io.Timestamp.Clock) Io.Timestamp.Error!i96 { const pool: *Pool = @ptrCast(@alignCast(userdata)); - try pool.checkCancel(); - const timespec = try posix.clock_gettime(clockid); - return @enumFromInt(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec); + _ = pool; + const clock_id: posix.clockid_t = clockToPosix(clock); + var tp: posix.timespec = undefined; + switch (posix.errno(posix.system.clock_gettime(clock_id, &tp))) { + .SUCCESS => return @intCast(@as(i128, tp.sec) * std.time.ns_per_s + tp.nsec), + .INVAL => return error.UnsupportedClock, + else => |err| return posix.unexpectedErrno(err), + } } -fn sleep(userdata: ?*anyopaque, clockid: posix.clockid_t, timeout: Io.Timeout) Io.SleepError!void { +fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { const pool: *Pool = @ptrCast(@alignCast(userdata)); + const clock_id: posix.clockid_t = clockToPosix(switch (timeout) { + .none => .monotonic, + .duration => |d| d.clock, + .deadline => |d| d.clock, + }); const deadline_nanoseconds: i96 = switch (timeout) { .none => std.math.maxInt(i96), - .duration => |duration| duration.nanoseconds, - .deadline => |deadline| @intFromEnum(deadline), + .duration => |d| d.duration.nanoseconds, + .deadline => |deadline| deadline.nanoseconds, }; var timespec: posix.timespec = .{ .sec = @intCast(@divFloor(deadline_nanoseconds, std.time.ns_per_s)), @@ -1019,13 +1029,12 @@ fn sleep(userdata: ?*anyopaque, clockid: posix.clockid_t, timeout: Io.Timeout) I }; while (true) { try pool.checkCancel(); - switch (std.os.linux.E.init(std.os.linux.clock_nanosleep(clockid, .{ .ABSTIME = switch (timeout) { + switch (std.os.linux.E.init(std.os.linux.clock_nanosleep(clock_id, .{ .ABSTIME = switch (timeout) { .none, .duration => false, .deadline => true, } }, ×pec, ×pec))) { .SUCCESS => return, - .FAULT => |err| return errnoBug(err), - .INTR => {}, + .INTR => continue, .INVAL => return error.UnsupportedClock, else => |err| return posix.unexpectedErrno(err), } @@ -1313,15 +1322,18 @@ fn netSend( handle: Io.net.Socket.Handle, messages: []Io.net.OutgoingMessage, flags: Io.net.SendFlags, -) Io.net.Socket.SendError!void { +) Io.net.SendResult { const pool: *Pool = @ptrCast(@alignCast(userdata)); if (have_sendmmsg) { var i: usize = 0; while (messages.len - i != 0) { - i += try netSendMany(pool, handle, messages[i..], flags); + i += netSendMany(pool, handle, messages[i..], flags) catch |err| return .{ .fail = .{ + .err = err, + .sent = i, + } }; } - return; + return .success; } try pool.checkCancel(); @@ -1391,11 +1403,11 @@ fn netSendMany( .NOMEM => return error.SystemResources, .NOTSOCK => |err| return errnoBug(err), // The file descriptor sockfd does not refer to a socket. .OPNOTSUPP => |err| return errnoBug(err), // Some bit in the flags argument is inappropriate for the socket type. - .PIPE => return error.SocketNotConnected, + .PIPE => return error.SocketUnconnected, .AFNOSUPPORT => return error.AddressFamilyUnsupported, .HOSTUNREACH => return error.NetworkUnreachable, .NETUNREACH => return error.NetworkUnreachable, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .NETDOWN => return error.NetworkDown, else => |err| return posix.unexpectedErrno(err), } @@ -1405,16 +1417,128 @@ fn netSendMany( fn netReceive( userdata: ?*anyopaque, handle: Io.net.Socket.Handle, - buffer: []u8, + message_buffer: []Io.net.IncomingMessage, + data_buffer: []u8, + flags: Io.net.ReceiveFlags, timeout: Io.Timeout, -) Io.net.Socket.ReceiveTimeoutError!Io.net.ReceivedMessage { +) struct { ?Io.net.Socket.ReceiveTimeoutError, usize } { const pool: *Pool = @ptrCast(@alignCast(userdata)); - try pool.checkCancel(); - _ = handle; - _ = buffer; - _ = timeout; - @panic("TODO"); + // recvmmsg is useless, here's why: + // * [timeout bug](https://bugzilla.kernel.org/show_bug.cgi?id=75371) + // * it wants iovecs for each message but we have a better API: one data + // buffer to handle all the messages. The better API cannot be lowered to + // the split vectors though because reducing the buffer size might make + // some messages unreceivable. + + // So the strategy instead is to use poll with timeout and then non-blocking + // recvmsg calls. + const posix_flags: u32 = + @as(u32, if (flags.oob) posix.MSG.OOB else 0) | + @as(u32, if (flags.peek) posix.MSG.PEEK else 0) | + @as(u32, if (flags.trunc) posix.MSG.TRUNC else 0) | + posix.MSG.DONTWAIT | posix.MSG.NOSIGNAL; + + var poll_fds: [1]posix.pollfd = .{ + .{ + .fd = handle, + .events = posix.POLL.IN, + .revents = undefined, + }, + }; + var message_i: usize = 0; + var data_i: usize = 0; + + // TODO: recvmsg first, then poll if EAGAIN. saves syscall in case the messages are already queued. + + const deadline = timeout.toDeadline(pool.io()) catch |err| return .{ err, message_i }; + + poll: while (true) { + pool.checkCancel() catch |err| return .{ err, message_i }; + + if (message_i > 0 or message_buffer.len - message_i == 0) return .{ null, message_i }; + + const max_poll_ms = std.math.maxInt(u31); + const timeout_ms: u31 = if (deadline) |d| t: { + const duration = d.durationFromNow(pool.io()) catch |err| return .{ err, message_i }; + if (duration.nanoseconds <= 0) return .{ error.Timeout, message_i }; + break :t @intCast(@min(max_poll_ms, duration.toMilliseconds())); + } else max_poll_ms; + + const poll_rc = posix.system.poll(&poll_fds, poll_fds.len, timeout_ms); + switch (posix.errno(poll_rc)) { + .SUCCESS => { + if (poll_rc == 0) { + // Possibly spurious timeout. + if (deadline == null) continue; + return .{ error.Timeout, message_i }; + } + + // Proceed to recvmsg. + while (true) { + pool.checkCancel() catch |err| return .{ err, message_i }; + + const message = &message_buffer[message_i]; + const remaining_data_buffer = data_buffer[data_i..]; + var storage: PosixAddress = undefined; + var iov: posix.iovec = .{ .base = remaining_data_buffer.ptr, .len = remaining_data_buffer.len }; + var msg: posix.msghdr = .{ + .name = &storage.any, + .namelen = @sizeOf(PosixAddress), + .iov = (&iov)[0..1], + .iovlen = 1, + .control = message.control.ptr, + .controllen = message.control.len, + .flags = undefined, + }; + + const rc = posix.system.recvmsg(handle, &msg, posix_flags); + switch (posix.errno(rc)) { + .SUCCESS => { + const data = remaining_data_buffer[0..@intCast(rc)]; + data_i += data.len; + message.* = .{ + .from = addressFromPosix(&storage), + .data = data, + .control = if (msg.control) |ptr| @as([*]u8, @ptrCast(ptr))[0..msg.controllen] else message.control, + .flags = .{ + .eor = (msg.flags & posix.MSG.EOR) != 0, + .trunc = (msg.flags & posix.MSG.TRUNC) != 0, + .ctrunc = (msg.flags & posix.MSG.CTRUNC) != 0, + .oob = (msg.flags & posix.MSG.OOB) != 0, + .errqueue = (msg.flags & posix.MSG.ERRQUEUE) != 0, + }, + }; + message_i += 1; + continue; + }, + .AGAIN => continue :poll, + .BADF => |err| return .{ errnoBug(err), message_i }, + .NFILE => return .{ error.SystemFdQuotaExceeded, message_i }, + .MFILE => return .{ error.ProcessFdQuotaExceeded, message_i }, + .INTR => continue, + .FAULT => |err| return .{ errnoBug(err), message_i }, + .INVAL => |err| return .{ errnoBug(err), message_i }, + .NOBUFS => return .{ error.SystemResources, message_i }, + .NOMEM => return .{ error.SystemResources, message_i }, + .NOTCONN => return .{ error.SocketUnconnected, message_i }, + .NOTSOCK => |err| return .{ errnoBug(err), message_i }, + .MSGSIZE => return .{ error.MessageOversize, message_i }, + .PIPE => return .{ error.SocketUnconnected, message_i }, + .OPNOTSUPP => |err| return .{ errnoBug(err), message_i }, + .CONNRESET => return .{ error.ConnectionResetByPeer, message_i }, + .NETDOWN => return .{ error.NetworkDown, message_i }, + else => |err| return .{ posix.unexpectedErrno(err), message_i }, + } + } + }, + .INTR => continue, + .FAULT => |err| return .{ errnoBug(err), message_i }, + .INVAL => |err| return .{ errnoBug(err), message_i }, + .NOMEM => return .{ error.SystemResources, message_i }, + else => |err| return .{ posix.unexpectedErrno(err), message_i }, + } + } } fn netWritePosix( @@ -1653,3 +1777,11 @@ fn posixProtocol(protocol: ?Io.net.Protocol) u32 { fn recoverableOsBugDetected() void { if (builtin.mode == .Debug) unreachable; } + +fn clockToPosix(clock: Io.Timestamp.Clock) posix.clockid_t { + return switch (clock) { + .realtime => posix.CLOCK.REALTIME, + .monotonic => posix.CLOCK.MONOTONIC, + .boottime => posix.CLOCK.BOOTTIME, + }; +} diff --git a/lib/std/Io/net.zig b/lib/std/Io/net.zig index f83dc7e97c..5672abd071 100644 --- a/lib/std/Io/net.zig +++ b/lib/std/Io/net.zig @@ -695,9 +695,41 @@ pub const Ip6Address = struct { }; }; -pub const ReceivedMessage = struct { +pub const ReceiveFlags = packed struct(u8) { + oob: bool = false, + peek: bool = false, + trunc: bool = false, + _: u5 = 0, +}; + +pub const IncomingMessage = struct { + /// Populated by receive functions. from: IpAddress, - len: usize, + /// Populated by receive functions, points into the caller-supplied buffer. + data: []u8, + /// Supplied by caller before calling receive functions; mutated by receive + /// functions. + control: []u8 = &.{}, + /// Populated by receive functions. + flags: Flags, + + pub const Flags = packed struct(u8) { + /// indicates end-of-record; the data returned completed a record + /// (generally used with sockets of type SOCK_SEQPACKET). + eor: bool, + /// indicates that the trailing portion of a datagram was discarded + /// because the datagram was larger than the buffer supplied. + trunc: bool, + /// indicates that some control data was discarded due to lack of + /// space in the buffer for ancil‐ lary data. + ctrunc: bool, + /// indicates expedited or out-of-band data was received. + oob: bool, + /// indicates that no data was received but an extended error from the + /// socket error queue. + errqueue: bool, + _: u3 = 0, + }; }; pub const OutgoingMessage = struct { @@ -718,6 +750,14 @@ pub const SendFlags = packed struct(u8) { _: u3 = 0, }; +pub const SendResult = union(enum) { + success, + fail: struct { + err: Socket.SendError, + sent: usize, + }, +}; + pub const Interface = struct { /// Value 0 indicates `none`. index: u32, @@ -839,7 +879,7 @@ pub const Socket = struct { ConnectionResetByPeer, /// Local end has been shut down on a connection-oriented socket, or /// the socket was never connected. - SocketNotConnected, + SocketUnconnected, } || Io.UnexpectedError || Io.Cancelable; /// Transfers `data` to `dest`, connectionless, in one packet. @@ -853,14 +893,34 @@ pub const Socket = struct { return io.vtable.netSend(io.userdata, s.handle, messages, flags); } - pub const ReceiveError = error{} || Io.UnexpectedError || Io.Cancelable; + pub const ReceiveError = error{ + /// Insufficient memory or other resource internal to the operating system. + SystemResources, + /// Per-process limit on the number of open file descriptors has been reached. + ProcessFdQuotaExceeded, + /// System-wide limit on the total number of open files has been reached. + SystemFdQuotaExceeded, + /// Local end has been shut down on a connection-oriented socket, or + /// the socket was never connected. + SocketUnconnected, + /// The socket type requires that message be sent atomically, and the + /// size of the message to be sent made this impossible. The message + /// was not transmitted, or was partially transmitted. + MessageOversize, + /// Network connection was unexpectedly closed by sender. + ConnectionResetByPeer, + /// The local network interface used to reach the destination is offline. + NetworkDown, + } || Io.UnexpectedError || Io.Cancelable; /// Waits for data. Connectionless. /// /// See also: /// * `receiveTimeout` - pub fn receive(s: *const Socket, io: Io, source: *const IpAddress, buffer: []u8) ReceiveError!ReceivedMessage { - return io.vtable.netReceive(io.userdata, s.handle, source, buffer, .none); + pub fn receive(s: *const Socket, io: Io, buffer: []u8) ReceiveError!IncomingMessage { + var message: IncomingMessage = undefined; + assert(1 == try io.vtable.netReceive(io.userdata, s.handle, (&message)[0..1], buffer, .{}, .none)); + return message; } pub const ReceiveTimeoutError = ReceiveError || Io.Timeout.Error; @@ -871,13 +931,36 @@ pub const Socket = struct { /// /// See also: /// * `receive` + /// * `receiveManyTimeout` pub fn receiveTimeout( s: *const Socket, io: Io, buffer: []u8, timeout: Io.Timeout, - ) ReceiveTimeoutError!ReceivedMessage { - return io.vtable.netReceive(io.userdata, s.handle, buffer, timeout); + ) ReceiveTimeoutError!IncomingMessage { + var message: IncomingMessage = undefined; + assert(1 == try io.vtable.netReceive(io.userdata, s.handle, (&message)[0..1], buffer, .{}, timeout)); + return message; + } + + /// Waits until at least one message is delivered, possibly returning more + /// than one message. Connectionless. + /// + /// Returns number of messages received, or `error.Timeout` if no message + /// arrives early enough. + /// + /// See also: + /// * `receive` + /// * `receiveTimeout` + pub fn receiveManyTimeout( + s: *const Socket, + io: Io, + message_buffer: []IncomingMessage, + data_buffer: []u8, + flags: ReceiveFlags, + timeout: Io.Timeout, + ) struct { ?ReceiveTimeoutError, usize } { + return io.vtable.netReceive(io.userdata, s.handle, message_buffer, data_buffer, flags, timeout); } }; diff --git a/lib/std/Io/net/HostName.zig b/lib/std/Io/net/HostName.zig index 1a595a0b67..ce17b86633 100644 --- a/lib/std/Io/net/HostName.zig +++ b/lib/std/Io/net/HostName.zig @@ -52,7 +52,7 @@ pub const LookupError = error{ InvalidDnsARecord, InvalidDnsAAAARecord, NameServerFailure, -} || Io.NowError || IpAddress.BindError || Io.File.OpenError || Io.File.Reader.Error || Io.Cancelable; +} || Io.Timestamp.Error || IpAddress.BindError || Io.File.OpenError || Io.File.Reader.Error || Io.Cancelable; pub const LookupResult = struct { /// How many `LookupOptions.addresses_buffer` elements are populated. @@ -222,11 +222,11 @@ fn lookupDns(io: Io, lookup_canon_name: []const u8, rc: *const ResolvConf, optio .{ .af = .ip4, .rr = std.posix.RR.AAAA }, }; var query_buffers: [2][280]u8 = undefined; - var answer_buffers: [2][512]u8 = undefined; + var answer_buffer: [2 * 512]u8 = undefined; var queries_buffer: [2][]const u8 = undefined; var answers_buffer: [2][]const u8 = undefined; var nq: usize = 0; - var next_answer_buffer: usize = 0; + var answer_buffer_i: usize = 0; for (family_records) |fr| { if (options.family != fr.af) { @@ -262,79 +262,89 @@ fn lookupDns(io: Io, lookup_canon_name: []const u8, rc: *const ResolvConf, optio const mapped_nameservers = if (any_ip6) ip4_mapped[0..rc.nameservers_len] else rc.nameservers(); const queries = queries_buffer[0..nq]; const answers = answers_buffer[0..queries.len]; + var answers_remaining = answers.len; for (answers) |*answer| answer.len = 0; - var now_ts = try io.now(.MONOTONIC); - const final_ts = now_ts.addDuration(.seconds(rc.timeout_seconds)); + // boottime is chosen because time the computer is suspended should count + // against time spent waiting for external messages to arrive. + var now_ts = try Io.Timestamp.now(io, .boottime); + const final_ts = now_ts.addDuration(.fromSeconds(rc.timeout_seconds)); const attempt_duration: Io.Duration = .{ .nanoseconds = std.time.ns_per_s * @as(usize, rc.timeout_seconds) / rc.attempts, }; - send: while (now_ts.compare(.lt, final_ts)) : (now_ts = try io.now(.MONOTONIC)) { - var message_buffer: [queries_buffer.len * ResolvConf.max_nameservers]Io.net.OutgoingMessage = undefined; - var message_i: usize = 0; - for (queries, answers) |query, *answer| { - if (answer.len != 0) continue; - for (mapped_nameservers) |*ns| { - message_buffer[message_i] = .{ - .address = ns, - .data_ptr = query.ptr, - .data_len = query.len, - }; - message_i += 1; - } - } - io.vtable.netSend(io.userdata, socket.handle, message_buffer[0..message_i], .{}) catch {}; - - const timeout: Io.Timeout = .{ .deadline = now_ts.addDuration(attempt_duration) }; - - while (true) { - const buf = &answer_buffers[next_answer_buffer]; - const reply = socket.receiveTimeout(io, buf, timeout) catch |err| switch (err) { - error.Canceled => return error.Canceled, - error.Timeout => continue :send, - else => continue, - }; - - // Ignore non-identifiable packets. - if (reply.len < 4) continue; - - // Ignore replies from addresses we didn't send to. - const ns = for (mapped_nameservers) |*ns| { - if (reply.from.eql(ns)) break ns; - } else { - continue; - }; - - const reply_msg = buf[0..reply.len]; - - // Find which query this answer goes with, if any. - const query, const answer = for (queries, answers) |query, *answer| { - if (reply_msg[0] == query[0] and reply_msg[1] == query[1]) break .{ query, answer }; - } else { - continue; - }; - if (answer.len != 0) continue; - - // Only accept positive or negative responses; retry immediately on - // server failure, and ignore all other codes such as refusal. - switch (reply_msg[3] & 15) { - 0, 3 => { - answer.* = reply_msg; - next_answer_buffer += 1; - if (next_answer_buffer == answers.len) break :send; - }, - 2 => { - var message: Io.net.OutgoingMessage = .{ + send: while (now_ts.compare(.lt, final_ts)) : (now_ts = try Io.Timestamp.now(io, .boottime)) { + const max_messages = queries_buffer.len * ResolvConf.max_nameservers; + { + var message_buffer: [max_messages]Io.net.OutgoingMessage = undefined; + var message_i: usize = 0; + for (queries, answers) |query, *answer| { + if (answer.len != 0) continue; + for (mapped_nameservers) |*ns| { + message_buffer[message_i] = .{ .address = ns, .data_ptr = query.ptr, .data_len = query.len, }; - io.vtable.netSend(io.userdata, socket.handle, (&message)[0..1], .{}) catch {}; - continue; - }, - else => continue, + message_i += 1; + } } + _ = io.vtable.netSend(io.userdata, socket.handle, message_buffer[0..message_i], .{}); + } + + const timeout: Io.Timeout = .{ .deadline = now_ts.addDuration(attempt_duration) }; + + while (true) { + var message_buffer: [max_messages]Io.net.IncomingMessage = undefined; + const buf = answer_buffer[answer_buffer_i..]; + const recv_err, const recv_n = socket.receiveManyTimeout(io, &message_buffer, buf, .{}, timeout); + for (message_buffer[0..recv_n]) |*received_message| { + const reply = received_message.data; + // Ignore non-identifiable packets. + if (reply.len < 4) continue; + + // Ignore replies from addresses we didn't send to. + const ns = for (mapped_nameservers) |*ns| { + if (received_message.from.eql(ns)) break ns; + } else { + continue; + }; + + // Find which query this answer goes with, if any. + const query, const answer = for (queries, answers) |query, *answer| { + if (reply[0] == query[0] and reply[1] == query[1]) break .{ query, answer }; + } else { + continue; + }; + if (answer.len != 0) continue; + + // Only accept positive or negative responses; retry immediately on + // server failure, and ignore all other codes such as refusal. + switch (reply[3] & 15) { + 0, 3 => { + answer.* = reply; + answer_buffer_i += reply.len; + answers_remaining -= 1; + if (answer_buffer.len - answer_buffer_i == 0) break :send; + if (answers_remaining == 0) break :send; + }, + 2 => { + var retry_message: Io.net.OutgoingMessage = .{ + .address = ns, + .data_ptr = query.ptr, + .data_len = query.len, + }; + _ = io.vtable.netSend(io.userdata, socket.handle, (&retry_message)[0..1], .{}); + continue; + }, + else => continue, + } + } + if (recv_err) |err| switch (err) { + error.Canceled => return error.Canceled, + error.Timeout => continue :send, + else => continue, + }; } } else { return error.NameServerFailure; diff --git a/lib/std/net.zig b/lib/std/net.zig index 083ee1da93..7863967e63 100644 --- a/lib/std/net.zig +++ b/lib/std/net.zig @@ -1917,7 +1917,7 @@ pub const Stream = struct { MessageTooBig, NetworkSubsystemFailed, ConnectionResetByPeer, - SocketNotConnected, + SocketUnconnected, }; pub const WriteError = posix.SendMsgError || error{ @@ -1926,7 +1926,7 @@ pub const Stream = struct { MessageTooBig, NetworkSubsystemFailed, SystemResources, - SocketNotConnected, + SocketUnconnected, Unexpected, }; @@ -2004,7 +2004,7 @@ pub const Stream = struct { .WSAEMSGSIZE => return error.MessageTooBig, .WSAENETDOWN => return error.NetworkSubsystemFailed, .WSAENETRESET => return error.ConnectionResetByPeer, - .WSAENOTCONN => return error.SocketNotConnected, + .WSAENOTCONN => return error.SocketUnconnected, .WSAEWOULDBLOCK => return error.WouldBlock, .WSANOTINITIALISED => unreachable, // WSAStartup must be called before this function .WSA_IO_PENDING => unreachable, @@ -2171,7 +2171,7 @@ pub const Stream = struct { .WSAENETDOWN => return error.NetworkSubsystemFailed, .WSAENETRESET => return error.ConnectionResetByPeer, .WSAENOBUFS => return error.SystemResources, - .WSAENOTCONN => return error.SocketNotConnected, + .WSAENOTCONN => return error.SocketUnconnected, .WSAENOTSOCK => unreachable, // not a socket .WSAEOPNOTSUPP => unreachable, // only for message-oriented sockets .WSAESHUTDOWN => unreachable, // cannot send on a socket after write shutdown diff --git a/lib/std/posix.zig b/lib/std/posix.zig index f97b0574af..93676f2a74 100644 --- a/lib/std/posix.zig +++ b/lib/std/posix.zig @@ -840,7 +840,7 @@ pub fn read(fd: fd_t, buf: []u8) ReadError!usize { .ISDIR => return error.IsDir, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.ConnectionTimedOut, .NOTCAPABLE => return error.AccessDenied, @@ -869,7 +869,7 @@ pub fn read(fd: fd_t, buf: []u8) ReadError!usize { .ISDIR => return error.IsDir, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.ConnectionTimedOut, else => |err| return unexpectedErrno(err), @@ -909,7 +909,7 @@ pub fn readv(fd: fd_t, iov: []const iovec) ReadError!usize { .ISDIR => return error.IsDir, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.ConnectionTimedOut, .NOTCAPABLE => return error.AccessDenied, @@ -931,7 +931,7 @@ pub fn readv(fd: fd_t, iov: []const iovec) ReadError!usize { .ISDIR => return error.IsDir, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.ConnectionTimedOut, else => |err| return unexpectedErrno(err), @@ -978,7 +978,7 @@ pub fn pread(fd: fd_t, buf: []u8, offset: u64) PReadError!usize { .ISDIR => return error.IsDir, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.ConnectionTimedOut, .NXIO => return error.Unseekable, @@ -1011,7 +1011,7 @@ pub fn pread(fd: fd_t, buf: []u8, offset: u64) PReadError!usize { .ISDIR => return error.IsDir, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.ConnectionTimedOut, .NXIO => return error.Unseekable, @@ -1129,7 +1129,7 @@ pub fn preadv(fd: fd_t, iov: []const iovec, offset: u64) PReadError!usize { .ISDIR => return error.IsDir, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.ConnectionTimedOut, .NXIO => return error.Unseekable, @@ -1155,7 +1155,7 @@ pub fn preadv(fd: fd_t, iov: []const iovec, offset: u64) PReadError!usize { .ISDIR => return error.IsDir, .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.ConnectionTimedOut, .NXIO => return error.Unseekable, @@ -3711,7 +3711,7 @@ pub const ShutdownError = error{ NetworkSubsystemFailed, /// The socket is not connected (connection-oriented sockets only). - SocketNotConnected, + SocketUnconnected, SystemResources, } || UnexpectedError; @@ -3731,7 +3731,7 @@ pub fn shutdown(sock: socket_t, how: ShutdownHow) ShutdownError!void { .WSAEINPROGRESS => return error.BlockingOperationInProgress, .WSAEINVAL => unreachable, .WSAENETDOWN => return error.NetworkSubsystemFailed, - .WSAENOTCONN => return error.SocketNotConnected, + .WSAENOTCONN => return error.SocketUnconnected, .WSAENOTSOCK => unreachable, .WSANOTINITIALISED => unreachable, else => |err| return windows.unexpectedWSAError(err), @@ -3746,7 +3746,7 @@ pub fn shutdown(sock: socket_t, how: ShutdownHow) ShutdownError!void { .SUCCESS => return, .BADF => unreachable, .INVAL => unreachable, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .NOTSOCK => unreachable, .NOBUFS => return error.SystemResources, else => |err| return unexpectedErrno(err), @@ -6181,7 +6181,7 @@ pub const SendMsgError = SendError || error{ NotDir, /// The socket is not connected (connection-oriented sockets only). - SocketNotConnected, + SocketUnconnected, AddressNotAvailable, }; @@ -6212,7 +6212,7 @@ pub fn sendmsg( .WSAENETDOWN => return error.NetworkSubsystemFailed, .WSAENETRESET => return error.ConnectionResetByPeer, .WSAENETUNREACH => return error.NetworkUnreachable, - .WSAENOTCONN => return error.SocketNotConnected, + .WSAENOTCONN => return error.SocketUnconnected, .WSAESHUTDOWN => unreachable, // The socket has been shut down; it is not possible to WSASendTo on a socket after shutdown has been invoked with how set to SD_SEND or SD_BOTH. .WSAEWOULDBLOCK => return error.WouldBlock, .WSANOTINITIALISED => unreachable, // A successful WSAStartup call must occur before using this function. @@ -6248,7 +6248,7 @@ pub fn sendmsg( .NOTDIR => return error.NotDir, .HOSTUNREACH => return error.NetworkUnreachable, .NETUNREACH => return error.NetworkUnreachable, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .NETDOWN => return error.NetworkSubsystemFailed, else => |err| return unexpectedErrno(err), } @@ -6315,7 +6315,7 @@ pub fn sendto( .WSAENETDOWN => return error.NetworkSubsystemFailed, .WSAENETRESET => return error.ConnectionResetByPeer, .WSAENETUNREACH => return error.NetworkUnreachable, - .WSAENOTCONN => return error.SocketNotConnected, + .WSAENOTCONN => return error.SocketUnconnected, .WSAESHUTDOWN => unreachable, // The socket has been shut down; it is not possible to WSASendTo on a socket after shutdown has been invoked with how set to SD_SEND or SD_BOTH. .WSAEWOULDBLOCK => return error.WouldBlock, .WSANOTINITIALISED => unreachable, // A successful WSAStartup call must occur before using this function. @@ -6353,7 +6353,7 @@ pub fn sendto( .NOTDIR => return error.NotDir, .HOSTUNREACH => return error.NetworkUnreachable, .NETUNREACH => return error.NetworkUnreachable, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .NETDOWN => return error.NetworkSubsystemFailed, else => |err| return unexpectedErrno(err), } @@ -6393,7 +6393,7 @@ pub fn send( error.NotDir => unreachable, error.NetworkUnreachable => unreachable, error.AddressNotAvailable => unreachable, - error.SocketNotConnected => unreachable, + error.SocketUnconnected => unreachable, error.UnreachableAddress => unreachable, else => |e| return e, }; @@ -6579,7 +6579,7 @@ pub const RecvFromError = error{ NetworkSubsystemFailed, /// The socket is not connected (connection-oriented sockets only). - SocketNotConnected, + SocketUnconnected, /// The other end closed the socket unexpectedly or a read is executed on a shut down socket BrokenPipe, @@ -6608,7 +6608,7 @@ pub fn recvfrom( .WSAEINVAL => return error.SocketNotBound, .WSAEMSGSIZE => return error.MessageTooBig, .WSAENETDOWN => return error.NetworkSubsystemFailed, - .WSAENOTCONN => return error.SocketNotConnected, + .WSAENOTCONN => return error.SocketUnconnected, .WSAEWOULDBLOCK => return error.WouldBlock, .WSAETIMEDOUT => return error.ConnectionTimedOut, // TODO: handle more errors @@ -6623,7 +6623,7 @@ pub fn recvfrom( .BADF => unreachable, // always a race condition .FAULT => unreachable, .INVAL => unreachable, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .NOTSOCK => unreachable, .INTR => continue, .AGAIN => return error.WouldBlock, @@ -6675,7 +6675,7 @@ pub fn recvmsg( .ISCONN => unreachable, // connection-mode socket was connected already but a recipient was specified .NOBUFS => return error.SystemResources, .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketNotConnected, + .NOTCONN => return error.SocketUnconnected, .NOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket. .MSGSIZE => return error.MessageTooBig, .PIPE => return error.BrokenPipe, diff --git a/lib/std/posix/test.zig b/lib/std/posix/test.zig index 50ffd7998f..87b101e1e9 100644 --- a/lib/std/posix/test.zig +++ b/lib/std/posix/test.zig @@ -634,7 +634,7 @@ test "shutdown socket" { } const sock = try posix.socket(posix.AF.INET, posix.SOCK.STREAM, 0); posix.shutdown(sock, .both) catch |err| switch (err) { - error.SocketNotConnected => {}, + error.SocketUnconnected => {}, else => |e| return e, }; std.net.Stream.close(.{ .handle = sock }); diff --git a/lib/std/zig/system.zig b/lib/std/zig/system.zig index e90c4ae023..47ff3d05fa 100644 --- a/lib/std/zig/system.zig +++ b/lib/std/zig/system.zig @@ -1283,7 +1283,7 @@ fn preadAtLeast(file: fs.File, buf: []u8, offset: u64, min_read_len: usize) !usi error.Unseekable => return error.UnableToReadElfFile, error.ConnectionResetByPeer => return error.UnableToReadElfFile, error.ConnectionTimedOut => return error.UnableToReadElfFile, - error.SocketNotConnected => return error.UnableToReadElfFile, + error.SocketUnconnected => return error.UnableToReadElfFile, error.Unexpected => return error.Unexpected, error.InputOutput => return error.FileSystem, error.AccessDenied => return error.Unexpected,