std.Io.net: implement receiving connectionless messages

This commit is contained in:
Andrew Kelley 2025-10-01 23:52:07 -07:00
parent 961961cf85
commit a6347a68a9
10 changed files with 462 additions and 150 deletions

View file

@ -665,14 +665,14 @@ pub const VTable = struct {
fileSeekBy: *const fn (?*anyopaque, file: File, offset: i64) File.SeekError!void,
fileSeekTo: *const fn (?*anyopaque, file: File, offset: u64) File.SeekError!void,
now: *const fn (?*anyopaque, clockid: std.posix.clockid_t) NowError!Timestamp,
sleep: *const fn (?*anyopaque, clockid: std.posix.clockid_t, timeout: Timeout) SleepError!void,
now: *const fn (?*anyopaque, Timestamp.Clock) Timestamp.Error!i96,
sleep: *const fn (?*anyopaque, Timeout) SleepError!void,
listen: *const fn (?*anyopaque, address: net.IpAddress, options: net.IpAddress.ListenOptions) net.IpAddress.ListenError!net.Server,
accept: *const fn (?*anyopaque, server: *net.Server) net.Server.AcceptError!net.Stream,
ipBind: *const fn (?*anyopaque, address: net.IpAddress, options: net.IpAddress.BindOptions) net.IpAddress.BindError!net.Socket,
netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) net.Socket.SendError!void,
netReceive: *const fn (?*anyopaque, handle: net.Socket.Handle, buffer: []u8, timeout: Timeout) net.Socket.ReceiveTimeoutError!net.ReceivedMessage,
netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) net.SendResult,
netReceive: *const fn (?*anyopaque, net.Socket.Handle, message_buffer: []net.IncomingMessage, data_buffer: []u8, net.ReceiveFlags, Timeout) struct { ?net.Socket.ReceiveTimeoutError, usize },
netRead: *const fn (?*anyopaque, src: net.Stream, data: [][]u8) net.Stream.Reader.Error!usize,
netWrite: *const fn (?*anyopaque, dest: net.Stream, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize,
netClose: *const fn (?*anyopaque, handle: net.Socket.Handle) void,
@ -700,46 +700,135 @@ pub const UnexpectedError = error{
pub const Dir = @import("Io/Dir.zig");
pub const File = @import("Io/File.zig");
pub const Timestamp = enum(i96) {
_,
pub const Timestamp = struct {
nanoseconds: i96,
clock: Clock,
pub const Clock = enum {
/// A settable system-wide clock that measures real (i.e. wall-clock)
/// time. This clock is affected by discontinuous jumps in the system
/// time (e.g., if the system administrator manually changes the
/// clock), and by frequency adjust ments performed by NTP and similar
/// applications.
/// This clock normally counts the number of seconds since
/// 1970-01-01 00:00:00 Coordinated Universal Time (UTC) except that it
/// ignores leap seconds; near a leap second it is typically
/// adjusted by NTP to stay roughly in sync with UTC.
realtime,
/// A nonsettable system-wide clock that represents time since some
/// unspecified point in the past.
///
/// On Linux, corresponds to how long the system has been running since
/// it booted.
///
/// Not affected by discontinuous jumps in the system time (e.g., if
/// the system administrator manually changes the clock), but is
/// affected by frequency adjustments. **This clock does not count time
/// that the system is suspended.**
///
/// Guarantees that the time returned by consecutive calls will not go
/// backwards, but successive calls may return identical
/// (not-increased) time values.
monotonic,
/// Identical to `monotonic` except it also includes any time that the
/// system is suspended.
boottime,
};
pub fn durationTo(from: Timestamp, to: Timestamp) Duration {
return .{ .nanoseconds = @intFromEnum(to) - @intFromEnum(from) };
assert(from.clock == to.clock);
return .{ .nanoseconds = to.nanoseconds - from.nanoseconds };
}
pub fn addDuration(from: Timestamp, duration: Duration) Timestamp {
return @enumFromInt(@intFromEnum(from) + duration.nanoseconds);
return .{
.nanoseconds = from.nanoseconds + duration.nanoseconds,
.clock = from.clock,
};
}
pub fn fromNow(io: Io, clockid: std.posix.clockid_t, duration: Duration) NowError!Timestamp {
const now_ts = try now(io, clockid);
pub const Error = error{UnsupportedClock} || UnexpectedError;
/// This function is not cancelable because first of all it does not block,
/// but more importantly, the cancelation logic itself may want to check
/// the time.
pub fn now(io: Io, clock: Clock) Error!Timestamp {
return .{
.nanoseconds = try io.vtable.now(io.userdata, clock),
.clock = clock,
};
}
pub fn fromNow(io: Io, clock: Clock, duration: Duration) Error!Timestamp {
const now_ts = try now(io, clock);
return addDuration(now_ts, duration);
}
pub fn untilNow(timestamp: Timestamp, io: Io) Error!Duration {
const now_ts = try Timestamp.now(io, timestamp.clock);
return timestamp.durationTo(now_ts);
}
pub fn durationFromNow(timestamp: Timestamp, io: Io) Error!Duration {
const now_ts = try now(io, timestamp.clock);
return now_ts.durationTo(timestamp);
}
pub fn toClock(t: Timestamp, io: Io, clock: Clock) Error!Timestamp {
if (t.clock == clock) return t;
const now_old = try now(io, t.clock);
const now_new = try now(io, clock);
const duration = now_old.durationTo(t);
return now_new.addDuration(duration);
}
pub fn compare(lhs: Timestamp, op: std.math.CompareOperator, rhs: Timestamp) bool {
return std.math.compare(@intFromEnum(lhs), op, @intFromEnum(rhs));
assert(lhs.clock == rhs.clock);
return std.math.compare(lhs.nanoseconds, op, rhs.nanoseconds);
}
};
pub const Duration = struct {
nanoseconds: i96,
pub fn ms(x: u64) Duration {
pub fn fromMilliseconds(x: i64) Duration {
return .{ .nanoseconds = @as(i96, x) * std.time.ns_per_ms };
}
pub fn seconds(x: u64) Duration {
pub fn fromSeconds(x: i64) Duration {
return .{ .nanoseconds = @as(i96, x) * std.time.ns_per_s };
}
pub fn toMilliseconds(d: Duration) i64 {
return @intCast(@divTrunc(d.nanoseconds, std.time.ns_per_ms));
}
pub fn toSeconds(d: Duration) i64 {
return @intCast(@divTrunc(d.nanoseconds, std.time.ns_per_s));
}
};
/// Declares under what conditions an operation should return `error.Timeout`.
pub const Timeout = union(enum) {
none,
duration: Duration,
duration: ClockAndDuration,
deadline: Timestamp,
pub const Error = error{Timeout};
pub const Error = error{ Timeout, UnsupportedClock };
pub const ClockAndDuration = struct {
clock: Timestamp.Clock,
duration: Duration,
};
pub fn toDeadline(t: Timeout, io: Io) Timestamp.Error!?Timestamp {
return switch (t) {
.none => null,
.duration => |d| try .fromNow(io, d.clock, d.duration),
.deadline => |d| d,
};
}
};
pub const NowError = std.posix.ClockGetTimeError || Cancelable;
pub const SleepError = error{ UnsupportedClock, Unexpected, Canceled };
pub const AnyFuture = opaque {};
@ -1231,12 +1320,10 @@ pub fn cancelRequested(io: Io) bool {
return io.vtable.cancelRequested(io.userdata);
}
pub fn now(io: Io, clockid: std.posix.clockid_t) NowError!Timestamp {
return io.vtable.now(io.userdata, clockid);
}
pub const SleepError = error{UnsupportedClock} || UnexpectedError || Cancelable;
pub fn sleep(io: Io, clockid: std.posix.clockid_t, timeout: Timeout) SleepError!void {
return io.vtable.sleep(io.userdata, clockid, timeout);
pub fn sleep(io: Io, timeout: Timeout) SleepError!void {
return io.vtable.sleep(io.userdata, timeout);
}
pub fn sleepDuration(io: Io, duration: Duration) SleepError!void {

View file

@ -1406,7 +1406,7 @@ fn pread(userdata: ?*anyopaque, file: Io.File, buffer: []u8, offset: std.posix.o
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
.NXIO => return error.Unseekable,

View file

@ -157,7 +157,7 @@ pub const ReadStreamingError = error{
ConnectionResetByPeer,
ConnectionTimedOut,
NotOpenForReading,
SocketNotConnected,
SocketUnconnected,
/// This error occurs when no global event loop is configured,
/// and reading from the file descriptor would block.
WouldBlock,

View file

@ -811,7 +811,7 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
.NOTCAPABLE => return error.AccessDenied,
@ -834,7 +834,7 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
else => |err| return posix.unexpectedErrno(err),
@ -933,7 +933,7 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
.NXIO => return error.Unseekable,
@ -960,7 +960,7 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
.NXIO => return error.Unseekable,
@ -999,19 +999,29 @@ fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: posi
};
}
fn now(userdata: ?*anyopaque, clockid: posix.clockid_t) Io.NowError!Io.Timestamp {
fn now(userdata: ?*anyopaque, clock: Io.Timestamp.Clock) Io.Timestamp.Error!i96 {
const pool: *Pool = @ptrCast(@alignCast(userdata));
try pool.checkCancel();
const timespec = try posix.clock_gettime(clockid);
return @enumFromInt(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec);
_ = pool;
const clock_id: posix.clockid_t = clockToPosix(clock);
var tp: posix.timespec = undefined;
switch (posix.errno(posix.system.clock_gettime(clock_id, &tp))) {
.SUCCESS => return @intCast(@as(i128, tp.sec) * std.time.ns_per_s + tp.nsec),
.INVAL => return error.UnsupportedClock,
else => |err| return posix.unexpectedErrno(err),
}
}
fn sleep(userdata: ?*anyopaque, clockid: posix.clockid_t, timeout: Io.Timeout) Io.SleepError!void {
fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
const clock_id: posix.clockid_t = clockToPosix(switch (timeout) {
.none => .monotonic,
.duration => |d| d.clock,
.deadline => |d| d.clock,
});
const deadline_nanoseconds: i96 = switch (timeout) {
.none => std.math.maxInt(i96),
.duration => |duration| duration.nanoseconds,
.deadline => |deadline| @intFromEnum(deadline),
.duration => |d| d.duration.nanoseconds,
.deadline => |deadline| deadline.nanoseconds,
};
var timespec: posix.timespec = .{
.sec = @intCast(@divFloor(deadline_nanoseconds, std.time.ns_per_s)),
@ -1019,13 +1029,12 @@ fn sleep(userdata: ?*anyopaque, clockid: posix.clockid_t, timeout: Io.Timeout) I
};
while (true) {
try pool.checkCancel();
switch (std.os.linux.E.init(std.os.linux.clock_nanosleep(clockid, .{ .ABSTIME = switch (timeout) {
switch (std.os.linux.E.init(std.os.linux.clock_nanosleep(clock_id, .{ .ABSTIME = switch (timeout) {
.none, .duration => false,
.deadline => true,
} }, &timespec, &timespec))) {
.SUCCESS => return,
.FAULT => |err| return errnoBug(err),
.INTR => {},
.INTR => continue,
.INVAL => return error.UnsupportedClock,
else => |err| return posix.unexpectedErrno(err),
}
@ -1313,15 +1322,18 @@ fn netSend(
handle: Io.net.Socket.Handle,
messages: []Io.net.OutgoingMessage,
flags: Io.net.SendFlags,
) Io.net.Socket.SendError!void {
) Io.net.SendResult {
const pool: *Pool = @ptrCast(@alignCast(userdata));
if (have_sendmmsg) {
var i: usize = 0;
while (messages.len - i != 0) {
i += try netSendMany(pool, handle, messages[i..], flags);
i += netSendMany(pool, handle, messages[i..], flags) catch |err| return .{ .fail = .{
.err = err,
.sent = i,
} };
}
return;
return .success;
}
try pool.checkCancel();
@ -1391,11 +1403,11 @@ fn netSendMany(
.NOMEM => return error.SystemResources,
.NOTSOCK => |err| return errnoBug(err), // The file descriptor sockfd does not refer to a socket.
.OPNOTSUPP => |err| return errnoBug(err), // Some bit in the flags argument is inappropriate for the socket type.
.PIPE => return error.SocketNotConnected,
.PIPE => return error.SocketUnconnected,
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
.HOSTUNREACH => return error.NetworkUnreachable,
.NETUNREACH => return error.NetworkUnreachable,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.NETDOWN => return error.NetworkDown,
else => |err| return posix.unexpectedErrno(err),
}
@ -1405,16 +1417,128 @@ fn netSendMany(
fn netReceive(
userdata: ?*anyopaque,
handle: Io.net.Socket.Handle,
buffer: []u8,
message_buffer: []Io.net.IncomingMessage,
data_buffer: []u8,
flags: Io.net.ReceiveFlags,
timeout: Io.Timeout,
) Io.net.Socket.ReceiveTimeoutError!Io.net.ReceivedMessage {
) struct { ?Io.net.Socket.ReceiveTimeoutError, usize } {
const pool: *Pool = @ptrCast(@alignCast(userdata));
try pool.checkCancel();
_ = handle;
_ = buffer;
_ = timeout;
@panic("TODO");
// recvmmsg is useless, here's why:
// * [timeout bug](https://bugzilla.kernel.org/show_bug.cgi?id=75371)
// * it wants iovecs for each message but we have a better API: one data
// buffer to handle all the messages. The better API cannot be lowered to
// the split vectors though because reducing the buffer size might make
// some messages unreceivable.
// So the strategy instead is to use poll with timeout and then non-blocking
// recvmsg calls.
const posix_flags: u32 =
@as(u32, if (flags.oob) posix.MSG.OOB else 0) |
@as(u32, if (flags.peek) posix.MSG.PEEK else 0) |
@as(u32, if (flags.trunc) posix.MSG.TRUNC else 0) |
posix.MSG.DONTWAIT | posix.MSG.NOSIGNAL;
var poll_fds: [1]posix.pollfd = .{
.{
.fd = handle,
.events = posix.POLL.IN,
.revents = undefined,
},
};
var message_i: usize = 0;
var data_i: usize = 0;
// TODO: recvmsg first, then poll if EAGAIN. saves syscall in case the messages are already queued.
const deadline = timeout.toDeadline(pool.io()) catch |err| return .{ err, message_i };
poll: while (true) {
pool.checkCancel() catch |err| return .{ err, message_i };
if (message_i > 0 or message_buffer.len - message_i == 0) return .{ null, message_i };
const max_poll_ms = std.math.maxInt(u31);
const timeout_ms: u31 = if (deadline) |d| t: {
const duration = d.durationFromNow(pool.io()) catch |err| return .{ err, message_i };
if (duration.nanoseconds <= 0) return .{ error.Timeout, message_i };
break :t @intCast(@min(max_poll_ms, duration.toMilliseconds()));
} else max_poll_ms;
const poll_rc = posix.system.poll(&poll_fds, poll_fds.len, timeout_ms);
switch (posix.errno(poll_rc)) {
.SUCCESS => {
if (poll_rc == 0) {
// Possibly spurious timeout.
if (deadline == null) continue;
return .{ error.Timeout, message_i };
}
// Proceed to recvmsg.
while (true) {
pool.checkCancel() catch |err| return .{ err, message_i };
const message = &message_buffer[message_i];
const remaining_data_buffer = data_buffer[data_i..];
var storage: PosixAddress = undefined;
var iov: posix.iovec = .{ .base = remaining_data_buffer.ptr, .len = remaining_data_buffer.len };
var msg: posix.msghdr = .{
.name = &storage.any,
.namelen = @sizeOf(PosixAddress),
.iov = (&iov)[0..1],
.iovlen = 1,
.control = message.control.ptr,
.controllen = message.control.len,
.flags = undefined,
};
const rc = posix.system.recvmsg(handle, &msg, posix_flags);
switch (posix.errno(rc)) {
.SUCCESS => {
const data = remaining_data_buffer[0..@intCast(rc)];
data_i += data.len;
message.* = .{
.from = addressFromPosix(&storage),
.data = data,
.control = if (msg.control) |ptr| @as([*]u8, @ptrCast(ptr))[0..msg.controllen] else message.control,
.flags = .{
.eor = (msg.flags & posix.MSG.EOR) != 0,
.trunc = (msg.flags & posix.MSG.TRUNC) != 0,
.ctrunc = (msg.flags & posix.MSG.CTRUNC) != 0,
.oob = (msg.flags & posix.MSG.OOB) != 0,
.errqueue = (msg.flags & posix.MSG.ERRQUEUE) != 0,
},
};
message_i += 1;
continue;
},
.AGAIN => continue :poll,
.BADF => |err| return .{ errnoBug(err), message_i },
.NFILE => return .{ error.SystemFdQuotaExceeded, message_i },
.MFILE => return .{ error.ProcessFdQuotaExceeded, message_i },
.INTR => continue,
.FAULT => |err| return .{ errnoBug(err), message_i },
.INVAL => |err| return .{ errnoBug(err), message_i },
.NOBUFS => return .{ error.SystemResources, message_i },
.NOMEM => return .{ error.SystemResources, message_i },
.NOTCONN => return .{ error.SocketUnconnected, message_i },
.NOTSOCK => |err| return .{ errnoBug(err), message_i },
.MSGSIZE => return .{ error.MessageOversize, message_i },
.PIPE => return .{ error.SocketUnconnected, message_i },
.OPNOTSUPP => |err| return .{ errnoBug(err), message_i },
.CONNRESET => return .{ error.ConnectionResetByPeer, message_i },
.NETDOWN => return .{ error.NetworkDown, message_i },
else => |err| return .{ posix.unexpectedErrno(err), message_i },
}
}
},
.INTR => continue,
.FAULT => |err| return .{ errnoBug(err), message_i },
.INVAL => |err| return .{ errnoBug(err), message_i },
.NOMEM => return .{ error.SystemResources, message_i },
else => |err| return .{ posix.unexpectedErrno(err), message_i },
}
}
}
fn netWritePosix(
@ -1653,3 +1777,11 @@ fn posixProtocol(protocol: ?Io.net.Protocol) u32 {
fn recoverableOsBugDetected() void {
if (builtin.mode == .Debug) unreachable;
}
fn clockToPosix(clock: Io.Timestamp.Clock) posix.clockid_t {
return switch (clock) {
.realtime => posix.CLOCK.REALTIME,
.monotonic => posix.CLOCK.MONOTONIC,
.boottime => posix.CLOCK.BOOTTIME,
};
}

View file

@ -695,9 +695,41 @@ pub const Ip6Address = struct {
};
};
pub const ReceivedMessage = struct {
pub const ReceiveFlags = packed struct(u8) {
oob: bool = false,
peek: bool = false,
trunc: bool = false,
_: u5 = 0,
};
pub const IncomingMessage = struct {
/// Populated by receive functions.
from: IpAddress,
len: usize,
/// Populated by receive functions, points into the caller-supplied buffer.
data: []u8,
/// Supplied by caller before calling receive functions; mutated by receive
/// functions.
control: []u8 = &.{},
/// Populated by receive functions.
flags: Flags,
pub const Flags = packed struct(u8) {
/// indicates end-of-record; the data returned completed a record
/// (generally used with sockets of type SOCK_SEQPACKET).
eor: bool,
/// indicates that the trailing portion of a datagram was discarded
/// because the datagram was larger than the buffer supplied.
trunc: bool,
/// indicates that some control data was discarded due to lack of
/// space in the buffer for ancil lary data.
ctrunc: bool,
/// indicates expedited or out-of-band data was received.
oob: bool,
/// indicates that no data was received but an extended error from the
/// socket error queue.
errqueue: bool,
_: u3 = 0,
};
};
pub const OutgoingMessage = struct {
@ -718,6 +750,14 @@ pub const SendFlags = packed struct(u8) {
_: u3 = 0,
};
pub const SendResult = union(enum) {
success,
fail: struct {
err: Socket.SendError,
sent: usize,
},
};
pub const Interface = struct {
/// Value 0 indicates `none`.
index: u32,
@ -839,7 +879,7 @@ pub const Socket = struct {
ConnectionResetByPeer,
/// Local end has been shut down on a connection-oriented socket, or
/// the socket was never connected.
SocketNotConnected,
SocketUnconnected,
} || Io.UnexpectedError || Io.Cancelable;
/// Transfers `data` to `dest`, connectionless, in one packet.
@ -853,14 +893,34 @@ pub const Socket = struct {
return io.vtable.netSend(io.userdata, s.handle, messages, flags);
}
pub const ReceiveError = error{} || Io.UnexpectedError || Io.Cancelable;
pub const ReceiveError = error{
/// Insufficient memory or other resource internal to the operating system.
SystemResources,
/// Per-process limit on the number of open file descriptors has been reached.
ProcessFdQuotaExceeded,
/// System-wide limit on the total number of open files has been reached.
SystemFdQuotaExceeded,
/// Local end has been shut down on a connection-oriented socket, or
/// the socket was never connected.
SocketUnconnected,
/// The socket type requires that message be sent atomically, and the
/// size of the message to be sent made this impossible. The message
/// was not transmitted, or was partially transmitted.
MessageOversize,
/// Network connection was unexpectedly closed by sender.
ConnectionResetByPeer,
/// The local network interface used to reach the destination is offline.
NetworkDown,
} || Io.UnexpectedError || Io.Cancelable;
/// Waits for data. Connectionless.
///
/// See also:
/// * `receiveTimeout`
pub fn receive(s: *const Socket, io: Io, source: *const IpAddress, buffer: []u8) ReceiveError!ReceivedMessage {
return io.vtable.netReceive(io.userdata, s.handle, source, buffer, .none);
pub fn receive(s: *const Socket, io: Io, buffer: []u8) ReceiveError!IncomingMessage {
var message: IncomingMessage = undefined;
assert(1 == try io.vtable.netReceive(io.userdata, s.handle, (&message)[0..1], buffer, .{}, .none));
return message;
}
pub const ReceiveTimeoutError = ReceiveError || Io.Timeout.Error;
@ -871,13 +931,36 @@ pub const Socket = struct {
///
/// See also:
/// * `receive`
/// * `receiveManyTimeout`
pub fn receiveTimeout(
s: *const Socket,
io: Io,
buffer: []u8,
timeout: Io.Timeout,
) ReceiveTimeoutError!ReceivedMessage {
return io.vtable.netReceive(io.userdata, s.handle, buffer, timeout);
) ReceiveTimeoutError!IncomingMessage {
var message: IncomingMessage = undefined;
assert(1 == try io.vtable.netReceive(io.userdata, s.handle, (&message)[0..1], buffer, .{}, timeout));
return message;
}
/// Waits until at least one message is delivered, possibly returning more
/// than one message. Connectionless.
///
/// Returns number of messages received, or `error.Timeout` if no message
/// arrives early enough.
///
/// See also:
/// * `receive`
/// * `receiveTimeout`
pub fn receiveManyTimeout(
s: *const Socket,
io: Io,
message_buffer: []IncomingMessage,
data_buffer: []u8,
flags: ReceiveFlags,
timeout: Io.Timeout,
) struct { ?ReceiveTimeoutError, usize } {
return io.vtable.netReceive(io.userdata, s.handle, message_buffer, data_buffer, flags, timeout);
}
};

View file

@ -52,7 +52,7 @@ pub const LookupError = error{
InvalidDnsARecord,
InvalidDnsAAAARecord,
NameServerFailure,
} || Io.NowError || IpAddress.BindError || Io.File.OpenError || Io.File.Reader.Error || Io.Cancelable;
} || Io.Timestamp.Error || IpAddress.BindError || Io.File.OpenError || Io.File.Reader.Error || Io.Cancelable;
pub const LookupResult = struct {
/// How many `LookupOptions.addresses_buffer` elements are populated.
@ -222,11 +222,11 @@ fn lookupDns(io: Io, lookup_canon_name: []const u8, rc: *const ResolvConf, optio
.{ .af = .ip4, .rr = std.posix.RR.AAAA },
};
var query_buffers: [2][280]u8 = undefined;
var answer_buffers: [2][512]u8 = undefined;
var answer_buffer: [2 * 512]u8 = undefined;
var queries_buffer: [2][]const u8 = undefined;
var answers_buffer: [2][]const u8 = undefined;
var nq: usize = 0;
var next_answer_buffer: usize = 0;
var answer_buffer_i: usize = 0;
for (family_records) |fr| {
if (options.family != fr.af) {
@ -262,79 +262,89 @@ fn lookupDns(io: Io, lookup_canon_name: []const u8, rc: *const ResolvConf, optio
const mapped_nameservers = if (any_ip6) ip4_mapped[0..rc.nameservers_len] else rc.nameservers();
const queries = queries_buffer[0..nq];
const answers = answers_buffer[0..queries.len];
var answers_remaining = answers.len;
for (answers) |*answer| answer.len = 0;
var now_ts = try io.now(.MONOTONIC);
const final_ts = now_ts.addDuration(.seconds(rc.timeout_seconds));
// boottime is chosen because time the computer is suspended should count
// against time spent waiting for external messages to arrive.
var now_ts = try Io.Timestamp.now(io, .boottime);
const final_ts = now_ts.addDuration(.fromSeconds(rc.timeout_seconds));
const attempt_duration: Io.Duration = .{
.nanoseconds = std.time.ns_per_s * @as(usize, rc.timeout_seconds) / rc.attempts,
};
send: while (now_ts.compare(.lt, final_ts)) : (now_ts = try io.now(.MONOTONIC)) {
var message_buffer: [queries_buffer.len * ResolvConf.max_nameservers]Io.net.OutgoingMessage = undefined;
var message_i: usize = 0;
for (queries, answers) |query, *answer| {
if (answer.len != 0) continue;
for (mapped_nameservers) |*ns| {
message_buffer[message_i] = .{
.address = ns,
.data_ptr = query.ptr,
.data_len = query.len,
};
message_i += 1;
}
}
io.vtable.netSend(io.userdata, socket.handle, message_buffer[0..message_i], .{}) catch {};
const timeout: Io.Timeout = .{ .deadline = now_ts.addDuration(attempt_duration) };
while (true) {
const buf = &answer_buffers[next_answer_buffer];
const reply = socket.receiveTimeout(io, buf, timeout) catch |err| switch (err) {
error.Canceled => return error.Canceled,
error.Timeout => continue :send,
else => continue,
};
// Ignore non-identifiable packets.
if (reply.len < 4) continue;
// Ignore replies from addresses we didn't send to.
const ns = for (mapped_nameservers) |*ns| {
if (reply.from.eql(ns)) break ns;
} else {
continue;
};
const reply_msg = buf[0..reply.len];
// Find which query this answer goes with, if any.
const query, const answer = for (queries, answers) |query, *answer| {
if (reply_msg[0] == query[0] and reply_msg[1] == query[1]) break .{ query, answer };
} else {
continue;
};
if (answer.len != 0) continue;
// Only accept positive or negative responses; retry immediately on
// server failure, and ignore all other codes such as refusal.
switch (reply_msg[3] & 15) {
0, 3 => {
answer.* = reply_msg;
next_answer_buffer += 1;
if (next_answer_buffer == answers.len) break :send;
},
2 => {
var message: Io.net.OutgoingMessage = .{
send: while (now_ts.compare(.lt, final_ts)) : (now_ts = try Io.Timestamp.now(io, .boottime)) {
const max_messages = queries_buffer.len * ResolvConf.max_nameservers;
{
var message_buffer: [max_messages]Io.net.OutgoingMessage = undefined;
var message_i: usize = 0;
for (queries, answers) |query, *answer| {
if (answer.len != 0) continue;
for (mapped_nameservers) |*ns| {
message_buffer[message_i] = .{
.address = ns,
.data_ptr = query.ptr,
.data_len = query.len,
};
io.vtable.netSend(io.userdata, socket.handle, (&message)[0..1], .{}) catch {};
continue;
},
else => continue,
message_i += 1;
}
}
_ = io.vtable.netSend(io.userdata, socket.handle, message_buffer[0..message_i], .{});
}
const timeout: Io.Timeout = .{ .deadline = now_ts.addDuration(attempt_duration) };
while (true) {
var message_buffer: [max_messages]Io.net.IncomingMessage = undefined;
const buf = answer_buffer[answer_buffer_i..];
const recv_err, const recv_n = socket.receiveManyTimeout(io, &message_buffer, buf, .{}, timeout);
for (message_buffer[0..recv_n]) |*received_message| {
const reply = received_message.data;
// Ignore non-identifiable packets.
if (reply.len < 4) continue;
// Ignore replies from addresses we didn't send to.
const ns = for (mapped_nameservers) |*ns| {
if (received_message.from.eql(ns)) break ns;
} else {
continue;
};
// Find which query this answer goes with, if any.
const query, const answer = for (queries, answers) |query, *answer| {
if (reply[0] == query[0] and reply[1] == query[1]) break .{ query, answer };
} else {
continue;
};
if (answer.len != 0) continue;
// Only accept positive or negative responses; retry immediately on
// server failure, and ignore all other codes such as refusal.
switch (reply[3] & 15) {
0, 3 => {
answer.* = reply;
answer_buffer_i += reply.len;
answers_remaining -= 1;
if (answer_buffer.len - answer_buffer_i == 0) break :send;
if (answers_remaining == 0) break :send;
},
2 => {
var retry_message: Io.net.OutgoingMessage = .{
.address = ns,
.data_ptr = query.ptr,
.data_len = query.len,
};
_ = io.vtable.netSend(io.userdata, socket.handle, (&retry_message)[0..1], .{});
continue;
},
else => continue,
}
}
if (recv_err) |err| switch (err) {
error.Canceled => return error.Canceled,
error.Timeout => continue :send,
else => continue,
};
}
} else {
return error.NameServerFailure;

View file

@ -1917,7 +1917,7 @@ pub const Stream = struct {
MessageTooBig,
NetworkSubsystemFailed,
ConnectionResetByPeer,
SocketNotConnected,
SocketUnconnected,
};
pub const WriteError = posix.SendMsgError || error{
@ -1926,7 +1926,7 @@ pub const Stream = struct {
MessageTooBig,
NetworkSubsystemFailed,
SystemResources,
SocketNotConnected,
SocketUnconnected,
Unexpected,
};
@ -2004,7 +2004,7 @@ pub const Stream = struct {
.WSAEMSGSIZE => return error.MessageTooBig,
.WSAENETDOWN => return error.NetworkSubsystemFailed,
.WSAENETRESET => return error.ConnectionResetByPeer,
.WSAENOTCONN => return error.SocketNotConnected,
.WSAENOTCONN => return error.SocketUnconnected,
.WSAEWOULDBLOCK => return error.WouldBlock,
.WSANOTINITIALISED => unreachable, // WSAStartup must be called before this function
.WSA_IO_PENDING => unreachable,
@ -2171,7 +2171,7 @@ pub const Stream = struct {
.WSAENETDOWN => return error.NetworkSubsystemFailed,
.WSAENETRESET => return error.ConnectionResetByPeer,
.WSAENOBUFS => return error.SystemResources,
.WSAENOTCONN => return error.SocketNotConnected,
.WSAENOTCONN => return error.SocketUnconnected,
.WSAENOTSOCK => unreachable, // not a socket
.WSAEOPNOTSUPP => unreachable, // only for message-oriented sockets
.WSAESHUTDOWN => unreachable, // cannot send on a socket after write shutdown

View file

@ -840,7 +840,7 @@ pub fn read(fd: fd_t, buf: []u8) ReadError!usize {
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
.NOTCAPABLE => return error.AccessDenied,
@ -869,7 +869,7 @@ pub fn read(fd: fd_t, buf: []u8) ReadError!usize {
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
else => |err| return unexpectedErrno(err),
@ -909,7 +909,7 @@ pub fn readv(fd: fd_t, iov: []const iovec) ReadError!usize {
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
.NOTCAPABLE => return error.AccessDenied,
@ -931,7 +931,7 @@ pub fn readv(fd: fd_t, iov: []const iovec) ReadError!usize {
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
else => |err| return unexpectedErrno(err),
@ -978,7 +978,7 @@ pub fn pread(fd: fd_t, buf: []u8, offset: u64) PReadError!usize {
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
.NXIO => return error.Unseekable,
@ -1011,7 +1011,7 @@ pub fn pread(fd: fd_t, buf: []u8, offset: u64) PReadError!usize {
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
.NXIO => return error.Unseekable,
@ -1129,7 +1129,7 @@ pub fn preadv(fd: fd_t, iov: []const iovec, offset: u64) PReadError!usize {
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
.NXIO => return error.Unseekable,
@ -1155,7 +1155,7 @@ pub fn preadv(fd: fd_t, iov: []const iovec, offset: u64) PReadError!usize {
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
.NXIO => return error.Unseekable,
@ -3711,7 +3711,7 @@ pub const ShutdownError = error{
NetworkSubsystemFailed,
/// The socket is not connected (connection-oriented sockets only).
SocketNotConnected,
SocketUnconnected,
SystemResources,
} || UnexpectedError;
@ -3731,7 +3731,7 @@ pub fn shutdown(sock: socket_t, how: ShutdownHow) ShutdownError!void {
.WSAEINPROGRESS => return error.BlockingOperationInProgress,
.WSAEINVAL => unreachable,
.WSAENETDOWN => return error.NetworkSubsystemFailed,
.WSAENOTCONN => return error.SocketNotConnected,
.WSAENOTCONN => return error.SocketUnconnected,
.WSAENOTSOCK => unreachable,
.WSANOTINITIALISED => unreachable,
else => |err| return windows.unexpectedWSAError(err),
@ -3746,7 +3746,7 @@ pub fn shutdown(sock: socket_t, how: ShutdownHow) ShutdownError!void {
.SUCCESS => return,
.BADF => unreachable,
.INVAL => unreachable,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.NOTSOCK => unreachable,
.NOBUFS => return error.SystemResources,
else => |err| return unexpectedErrno(err),
@ -6181,7 +6181,7 @@ pub const SendMsgError = SendError || error{
NotDir,
/// The socket is not connected (connection-oriented sockets only).
SocketNotConnected,
SocketUnconnected,
AddressNotAvailable,
};
@ -6212,7 +6212,7 @@ pub fn sendmsg(
.WSAENETDOWN => return error.NetworkSubsystemFailed,
.WSAENETRESET => return error.ConnectionResetByPeer,
.WSAENETUNREACH => return error.NetworkUnreachable,
.WSAENOTCONN => return error.SocketNotConnected,
.WSAENOTCONN => return error.SocketUnconnected,
.WSAESHUTDOWN => unreachable, // The socket has been shut down; it is not possible to WSASendTo on a socket after shutdown has been invoked with how set to SD_SEND or SD_BOTH.
.WSAEWOULDBLOCK => return error.WouldBlock,
.WSANOTINITIALISED => unreachable, // A successful WSAStartup call must occur before using this function.
@ -6248,7 +6248,7 @@ pub fn sendmsg(
.NOTDIR => return error.NotDir,
.HOSTUNREACH => return error.NetworkUnreachable,
.NETUNREACH => return error.NetworkUnreachable,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.NETDOWN => return error.NetworkSubsystemFailed,
else => |err| return unexpectedErrno(err),
}
@ -6315,7 +6315,7 @@ pub fn sendto(
.WSAENETDOWN => return error.NetworkSubsystemFailed,
.WSAENETRESET => return error.ConnectionResetByPeer,
.WSAENETUNREACH => return error.NetworkUnreachable,
.WSAENOTCONN => return error.SocketNotConnected,
.WSAENOTCONN => return error.SocketUnconnected,
.WSAESHUTDOWN => unreachable, // The socket has been shut down; it is not possible to WSASendTo on a socket after shutdown has been invoked with how set to SD_SEND or SD_BOTH.
.WSAEWOULDBLOCK => return error.WouldBlock,
.WSANOTINITIALISED => unreachable, // A successful WSAStartup call must occur before using this function.
@ -6353,7 +6353,7 @@ pub fn sendto(
.NOTDIR => return error.NotDir,
.HOSTUNREACH => return error.NetworkUnreachable,
.NETUNREACH => return error.NetworkUnreachable,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.NETDOWN => return error.NetworkSubsystemFailed,
else => |err| return unexpectedErrno(err),
}
@ -6393,7 +6393,7 @@ pub fn send(
error.NotDir => unreachable,
error.NetworkUnreachable => unreachable,
error.AddressNotAvailable => unreachable,
error.SocketNotConnected => unreachable,
error.SocketUnconnected => unreachable,
error.UnreachableAddress => unreachable,
else => |e| return e,
};
@ -6579,7 +6579,7 @@ pub const RecvFromError = error{
NetworkSubsystemFailed,
/// The socket is not connected (connection-oriented sockets only).
SocketNotConnected,
SocketUnconnected,
/// The other end closed the socket unexpectedly or a read is executed on a shut down socket
BrokenPipe,
@ -6608,7 +6608,7 @@ pub fn recvfrom(
.WSAEINVAL => return error.SocketNotBound,
.WSAEMSGSIZE => return error.MessageTooBig,
.WSAENETDOWN => return error.NetworkSubsystemFailed,
.WSAENOTCONN => return error.SocketNotConnected,
.WSAENOTCONN => return error.SocketUnconnected,
.WSAEWOULDBLOCK => return error.WouldBlock,
.WSAETIMEDOUT => return error.ConnectionTimedOut,
// TODO: handle more errors
@ -6623,7 +6623,7 @@ pub fn recvfrom(
.BADF => unreachable, // always a race condition
.FAULT => unreachable,
.INVAL => unreachable,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.NOTSOCK => unreachable,
.INTR => continue,
.AGAIN => return error.WouldBlock,
@ -6675,7 +6675,7 @@ pub fn recvmsg(
.ISCONN => unreachable, // connection-mode socket was connected already but a recipient was specified
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketNotConnected,
.NOTCONN => return error.SocketUnconnected,
.NOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket.
.MSGSIZE => return error.MessageTooBig,
.PIPE => return error.BrokenPipe,

View file

@ -634,7 +634,7 @@ test "shutdown socket" {
}
const sock = try posix.socket(posix.AF.INET, posix.SOCK.STREAM, 0);
posix.shutdown(sock, .both) catch |err| switch (err) {
error.SocketNotConnected => {},
error.SocketUnconnected => {},
else => |e| return e,
};
std.net.Stream.close(.{ .handle = sock });

View file

@ -1283,7 +1283,7 @@ fn preadAtLeast(file: fs.File, buf: []u8, offset: u64, min_read_len: usize) !usi
error.Unseekable => return error.UnableToReadElfFile,
error.ConnectionResetByPeer => return error.UnableToReadElfFile,
error.ConnectionTimedOut => return error.UnableToReadElfFile,
error.SocketNotConnected => return error.UnableToReadElfFile,
error.SocketUnconnected => return error.UnableToReadElfFile,
error.Unexpected => return error.Unexpected,
error.InputOutput => return error.FileSystem,
error.AccessDenied => return error.Unexpected,