Module: OMQ::Transport::TCP

Defined in:
lib/omq/transport/tcp.rb

Overview

TCP transport using Ruby sockets with Async.

Defined Under Namespace

Classes: Dialer, Listener

Class Method Summary collapse

Class Method Details

.apply_buffer_sizes(sock, options) ⇒ Object

Applies SO_SNDBUF / SO_RCVBUF to sock from the socket’s Options. No-op when both are nil (OS default).

Parameters:



132
133
134
135
136
137
138
139
140
# File 'lib/omq/transport/tcp.rb', line 132

def apply_buffer_sizes(sock, options)
  if options.sndbuf
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, options.sndbuf)
  end

  if options.rcvbuf
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, options.rcvbuf)
  end
end

.connect_timeout(options) ⇒ Object

Connect timeout: cap each attempt at the reconnect interval so a hung connect(2) (e.g. macOS kqueue + IPv6 ECONNREFUSED not delivered) doesn’t block the retry loop. Floor at 0.5s for real-network latency.



108
109
110
111
112
# File 'lib/omq/transport/tcp.rb', line 108

def connect_timeout(options)
  ri = options.reconnect_interval
  ri = ri.end if ri.is_a?(Range)
  [ri, 0.5].max
end

.dialer(endpoint, engine) ⇒ Dialer

Creates a TCP dialer for an endpoint.

Parameters:

  • endpoint (String)

    e.g. “tcp://127.0.0.1:5555”

  • engine (Engine)

Returns:



46
47
48
# File 'lib/omq/transport/tcp.rb', line 46

def dialer(endpoint, engine, **)
  Dialer.new(endpoint, engine)
end

.listener(endpoint, engine) ⇒ Listener

Creates a bound TCP listener.

Parameters:

  • endpoint (String)

    e.g. “tcp://127.0.0.1:5555” or “tcp://*:0”

  • engine (Engine)

Returns:

Raises:

  • (::Socket::ResolutionError)


22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/omq/transport/tcp.rb', line 22

def listener(endpoint, engine, **)
  host, port  = self.parse_endpoint(endpoint)
  lookup_host = normalize_bind_host(host)

  # Socket.tcp_server_sockets coordinates ephemeral ports across
  # address families and sets IPV6_V6ONLY so IPv4 and IPv6
  # wildcards don't collide on Linux.
  servers = ::Socket.tcp_server_sockets(lookup_host, port)
  raise ::Socket::ResolutionError, "no addresses for #{host.inspect}" if servers.empty?

  actual_port  = servers.first.local_address.ip_port
  display_host = host == "*" ? "*" : (lookup_host || "*")
  host_part    = display_host.include?(":") ? "[#{display_host}]" : display_host
  resolved     = "tcp://#{host_part}:#{actual_port}"
  Listener.new(resolved, servers, actual_port, engine)
end

.loopback_hostObject

Loopback address preference for bind/connect normalization. Returns “::1” when the host has at least one non-loopback, non-link-local IPv6 address, otherwise “127.0.0.1”.



93
94
95
96
97
98
99
100
101
# File 'lib/omq/transport/tcp.rb', line 93

def loopback_host
  @loopback_host ||= begin
    has_ipv6 = ::Socket.getifaddrs.any? do |ifa|
      addr = ifa.addr
      addr&.ipv6? && !addr.ipv6_loopback? && !addr.ipv6_linklocal?
    end
    has_ipv6 ? "::1" : "127.0.0.1"
  end
end

.normalize_bind_host(host) ⇒ Object

Normalizes the bind host:

"*"                    → nil (dual-stack wildcard via AI_PASSIVE)
"" / nil / "localhost" → loopback_host (::1 on IPv6-capable hosts, else 127.0.0.1)
else                   → unchanged


68
69
70
71
72
73
74
# File 'lib/omq/transport/tcp.rb', line 68

def normalize_bind_host(host)
  case host
  when "*" then nil
  when nil, "", "localhost" then loopback_host
  else host
  end
end

.normalize_connect_host(host) ⇒ Object

Normalizes the connect host: “”, nil, “*”, and “localhost” all map to the loopback host. Everything else is passed through so real hostnames still go through the resolver + Happy Eyeballs.



81
82
83
84
85
86
# File 'lib/omq/transport/tcp.rb', line 81

def normalize_connect_host(host)
  case host
  when nil, "", "*", "localhost" then loopback_host
  else host
  end
end

.parse_endpoint(endpoint) ⇒ Array(String, Integer)

Parses a TCP endpoint URI into host and port.

Parameters:

  • endpoint (String)

Returns:

  • (Array(String, Integer))


120
121
122
123
# File 'lib/omq/transport/tcp.rb', line 120

def parse_endpoint(endpoint)
  uri = URI.parse(endpoint)
  [uri.hostname, uri.port]
end

.validate_endpoint!(endpoint) ⇒ void

This method returns an undefined value.

Validates that the endpoint’s host can be resolved.

Parameters:

  • endpoint (String)


56
57
58
59
60
# File 'lib/omq/transport/tcp.rb', line 56

def validate_endpoint!(endpoint)
  host, _port = parse_endpoint(endpoint)
  lookup_host = normalize_bind_host(host)
  Addrinfo.getaddrinfo(lookup_host, nil, nil, :STREAM) if lookup_host
end