Class: TIMEx::Strategies::IO

Inherits:
Base
  • Object
show all
Defined in:
lib/timex/strategies/io.rb

Overview

Deadline-aware helpers for non-blocking socket I/O and DNS resolution.

The nested singleton methods implement reusable primitives; #run simply yields the Deadline to the caller for custom protocols.

See Also:

Constant Summary collapse

MIN_SOCKET_TIMEOUT =

Minimum seconds passed to SO_RCVTIMEO / SO_SNDTIMEO. A packed timeval of zero often means “disable timeout” on POSIX, which would leave blocking reads unbounded when the remaining budget rounds to 0.

0.001

Class Method Summary collapse

Methods inherited from Base

call, #call

Methods included from NamedComponent

included

Class Method Details

.apply_socket_timeouts(sock, deadline:) ⇒ void

This method returns an undefined value.

Best-effort SO_RCV,SNDTIMEO setter. The native pack format for struct timeval differs by platform (64-bit POSIX uses two long fields; Windows uses DWORD milliseconds). When SO_RCVTIMEO_FLOAT is exposed (Darwin, some BSDs) we prefer it because the option’s value is a raw Float, avoiding the packed-timeval mismatch. Failures are swallowed so callers can rely on wait_for as the primary deadline guard.

Parameters:

  • sock (::Socket)
  • deadline (Deadline, Numeric, Time, nil)


115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/timex/strategies/io.rb', line 115

def apply_socket_timeouts(sock, deadline:)
  deadline = Deadline.coerce(deadline)
  return if deadline.infinite?

  remaining = deadline.remaining
  return if remaining <= 0

  remaining = [remaining, MIN_SOCKET_TIMEOUT].max
  if ::Socket.const_defined?(:SO_RCVTIMEO_FLOAT) && ::Socket.const_defined?(:SO_SNDTIMEO_FLOAT)
    sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_RCVTIMEO_FLOAT, remaining)
    sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDTIMEO_FLOAT, remaining)
  else
    tv = pack_timeval(remaining)
    sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_RCVTIMEO, tv)
    sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDTIMEO, tv)
  end
rescue StandardError
  nil
end

.connect(host, port, deadline:, apply_timeouts: true) ⇒ ::Socket

Resolves host (respecting deadline) and connects via the first working address family. Avoids getaddrinfo blocking past the deadline by delegating to Resolv with the remaining time.

The returned socket also has SO_RCV,SNDTIMEO applied to the remaining deadline so that subsequent blocking reads don’t outlive the budget if the caller forgets to use read / write. Use apply_timeouts: false to opt out.

Parameters:

  • host (String)
  • port (Integer)
  • deadline (Deadline, Numeric, Time, nil)
  • apply_timeouts (Boolean) (defaults to: true)

    when true, sets socket read/write timeouts

Returns:

  • (::Socket)

    connected stream socket

Raises:

  • (SocketError)

    when resolution yields no addresses

  • (Expired)

    when resolution or connect exceeds the deadline



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/timex/strategies/io.rb', line 85

def connect(host, port, deadline:, apply_timeouts: true)
  deadline  = Deadline.coerce(deadline)
  addresses = resolve_host(host, deadline)
  raise ::SocketError, "could not resolve #{host}" if addresses.empty?

  last_error = nil
  addresses.each do |addr|
    sock = open_socket(addr, port, deadline)
    apply_socket_timeouts(sock, deadline:) if apply_timeouts
    return sock
  rescue Expired
    raise
  rescue StandardError => e
    last_error = e
    next
  end
  raise last_error || Errno::ECONNREFUSED.new("could not connect to #{host}:#{port}")
end

.read(io, len, deadline:) ⇒ String

Reads up to len bytes using non-blocking reads bounded by deadline.

Parameters:

  • io (::IO)
  • len (Integer)

    maximum bytes to read

  • deadline (Deadline, Numeric, Time, nil)

Returns:

  • (String)

    data read (may be shorter than len)

Raises:

  • (Expired)

    when the wait exhausts the budget



31
32
33
34
35
36
37
38
# File 'lib/timex/strategies/io.rb', line 31

def read(io, len, deadline:)
  deadline = Deadline.coerce(deadline)
  loop do
    return io.read_nonblock(len)
  rescue ::IO::WaitReadable
    wait_for(io, :read, deadline)
  end
end

.write(io, buffer, deadline:) ⇒ Integer

Writes the full buffer, retrying on IO::WaitWritable until done or expired.

Parameters:

  • io (::IO)
  • buffer (String)
  • deadline (Deadline, Numeric, Time, nil)

Returns:

  • (Integer)

    total bytes written

Raises:

  • (Expired)

    when the wait exhausts the budget

  • (IOError)

    when write_nonblock reports zero progress



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/timex/strategies/io.rb', line 48

def write(io, buffer, deadline:)
  deadline = Deadline.coerce(deadline)
  total    = buffer.bytesize
  offset   = 0
  while offset < total
    begin
      # Avoid the per-iteration `byteslice` alloc on the common path
      # where we write the whole buffer in one go; only slice once we
      # know the kernel took a partial write.
      chunk = offset.zero? ? buffer : buffer.byteslice(offset, total - offset)
      n     = io.write_nonblock(chunk)
      raise ::IOError, "write_nonblock returned 0 bytes (no progress)" if n.zero?

      offset += n
    rescue ::IO::WaitWritable
      wait_for(io, :write, deadline)
    end
  end
  total
end