Class: TIMEx::Strategies::IO
Overview
Deadline-aware helpers for non-blocking socket I/O and DNS resolution.
The nested singleton methods implement reusable primitives; #run simply yields the Deadline to the caller for custom protocols.
Constant Summary collapse
- MIN_SOCKET_TIMEOUT =
Minimum seconds passed to SO_RCVTIMEO / SO_SNDTIMEO. A packed timeval of zero often means “disable timeout” on POSIX, which would leave blocking reads unbounded when the remaining budget rounds to 0.
0.001
Class Method Summary collapse
-
.apply_socket_timeouts(sock, deadline:) ⇒ void
Best-effort SO_RCV,SNDTIMEO setter.
-
.connect(host, port, deadline:, apply_timeouts: true) ⇒ ::Socket
Resolves
host(respectingdeadline) and connects via the first working address family. -
.read(io, len, deadline:) ⇒ String
Reads up to
lenbytes using non-blocking reads bounded bydeadline. -
.write(io, buffer, deadline:) ⇒ Integer
Writes the full
buffer, retrying onIO::WaitWritableuntil done or expired.
Methods inherited from Base
Methods included from NamedComponent
Class Method Details
.apply_socket_timeouts(sock, deadline:) ⇒ void
This method returns an undefined value.
Best-effort SO_RCV,SNDTIMEO setter. The native pack format for struct timeval differs by platform (64-bit POSIX uses two long fields; Windows uses DWORD milliseconds). When SO_RCVTIMEO_FLOAT is exposed (Darwin, some BSDs) we prefer it because the option’s value is a raw Float, avoiding the packed-timeval mismatch. Failures are swallowed so callers can rely on wait_for as the primary deadline guard.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/timex/strategies/io.rb', line 115 def apply_socket_timeouts(sock, deadline:) deadline = Deadline.coerce(deadline) return if deadline.infinite? remaining = deadline.remaining return if remaining <= 0 remaining = [remaining, MIN_SOCKET_TIMEOUT].max if ::Socket.const_defined?(:SO_RCVTIMEO_FLOAT) && ::Socket.const_defined?(:SO_SNDTIMEO_FLOAT) sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_RCVTIMEO_FLOAT, remaining) sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDTIMEO_FLOAT, remaining) else tv = pack_timeval(remaining) sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_RCVTIMEO, tv) sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDTIMEO, tv) end rescue StandardError nil end |
.connect(host, port, deadline:, apply_timeouts: true) ⇒ ::Socket
Resolves host (respecting deadline) and connects via the first working address family. Avoids getaddrinfo blocking past the deadline by delegating to Resolv with the remaining time.
The returned socket also has SO_RCV,SNDTIMEO applied to the remaining deadline so that subsequent blocking reads don’t outlive the budget if the caller forgets to use read / write. Use apply_timeouts: false to opt out.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/timex/strategies/io.rb', line 85 def connect(host, port, deadline:, apply_timeouts: true) deadline = Deadline.coerce(deadline) addresses = resolve_host(host, deadline) raise ::SocketError, "could not resolve #{host}" if addresses.empty? last_error = nil addresses.each do |addr| sock = open_socket(addr, port, deadline) apply_socket_timeouts(sock, deadline:) if apply_timeouts return sock rescue Expired raise rescue StandardError => e last_error = e next end raise last_error || Errno::ECONNREFUSED.new("could not connect to #{host}:#{port}") end |
.read(io, len, deadline:) ⇒ String
Reads up to len bytes using non-blocking reads bounded by deadline.
31 32 33 34 35 36 37 38 |
# File 'lib/timex/strategies/io.rb', line 31 def read(io, len, deadline:) deadline = Deadline.coerce(deadline) loop do return io.read_nonblock(len) rescue ::IO::WaitReadable wait_for(io, :read, deadline) end end |
.write(io, buffer, deadline:) ⇒ Integer
Writes the full buffer, retrying on IO::WaitWritable until done or expired.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/timex/strategies/io.rb', line 48 def write(io, buffer, deadline:) deadline = Deadline.coerce(deadline) total = buffer.bytesize offset = 0 while offset < total begin # Avoid the per-iteration `byteslice` alloc on the common path # where we write the whole buffer in one go; only slice once we # know the kernel took a partial write. chunk = offset.zero? ? buffer : buffer.byteslice(offset, total - offset) n = io.write_nonblock(chunk) raise ::IOError, "write_nonblock returned 0 bytes (no progress)" if n.zero? offset += n rescue ::IO::WaitWritable wait_for(io, :write, deadline) end end total end |