Module: OMQ::Transport::TCP

Defined in:
lib/omq/transport/tcp.rb

Overview

TCP transport using Ruby sockets with Async.

Defined Under Namespace

Classes: Listener

Class Method Summary collapse

Class Method Details

.apply_buffer_sizes(sock, options) ⇒ Object

Applies SO_SNDBUF / SO_RCVBUF to sock from the socket’s Options. No-op when both are nil (OS default).

Parameters:



133
134
135
136
137
138
139
140
141
# File 'lib/omq/transport/tcp.rb', line 133

def apply_buffer_sizes(sock, options)
  if options.sndbuf
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, options.sndbuf)
  end

  if options.rcvbuf
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, options.rcvbuf)
  end
end

.bind(endpoint, engine) ⇒ Listener

Binds a TCP server.

Parameters:

  • endpoint (String)

    e.g. “tcp://127.0.0.1:5555” or “tcp://*:0”

  • engine (Engine)

Returns:

Raises:

  • (::Socket::ResolutionError)


19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/omq/transport/tcp.rb', line 19

def bind(endpoint, engine)
  host, port  = self.parse_endpoint(endpoint)
  lookup_host = normalize_bind_host(host)

  # Socket.tcp_server_sockets coordinates ephemeral ports across
  # address families and sets IPV6_V6ONLY so IPv4 and IPv6
  # wildcards don't collide on Linux.
  servers = ::Socket.tcp_server_sockets(lookup_host, port)
  raise ::Socket::ResolutionError, "no addresses for #{host.inspect}" if servers.empty?

  actual_port  = servers.first.local_address.ip_port
  display_host = host == "*" ? "*" : (lookup_host || "*")
  host_part    = display_host.include?(":") ? "[#{display_host}]" : display_host
  resolved     = "tcp://#{host_part}:#{actual_port}"
  Listener.new(resolved, servers, actual_port, engine)
end

.connect(endpoint, engine) ⇒ void

This method returns an undefined value.

Connects to a TCP endpoint.

Parameters:

  • endpoint (String)

    e.g. “tcp://127.0.0.1:5555”

  • engine (Engine)


55
56
57
58
59
60
61
# File 'lib/omq/transport/tcp.rb', line 55

def connect(endpoint, engine)
  host, port = self.parse_endpoint(endpoint)
  host       = normalize_connect_host(host)
  sock       = ::Socket.tcp(host, port, connect_timeout: connect_timeout(engine.options))
  apply_buffer_sizes(sock, engine.options)
  engine.handle_connected(IO::Stream::Buffered.wrap(sock), endpoint: endpoint)
end

.connect_timeout(options) ⇒ Object

Connect timeout: cap each attempt at the reconnect interval so a hung connect(2) (e.g. macOS kqueue + IPv6 ECONNREFUSED not delivered) doesn’t block the retry loop. Floor at 0.5s for real-network latency.



109
110
111
112
113
# File 'lib/omq/transport/tcp.rb', line 109

def connect_timeout(options)
  ri = options.reconnect_interval
  ri = ri.end if ri.is_a?(Range)
  [ri, 0.5].max
end

.loopback_hostObject

Loopback address preference for bind/connect normalization. Returns “::1” when the host has at least one non-loopback, non-link-local IPv6 address, otherwise “127.0.0.1”.



94
95
96
97
98
99
100
101
102
# File 'lib/omq/transport/tcp.rb', line 94

def loopback_host
  @loopback_host ||= begin
    has_ipv6 = ::Socket.getifaddrs.any? do |ifa|
      addr = ifa.addr
      addr&.ipv6? && !addr.ipv6_loopback? && !addr.ipv6_linklocal?
    end
    has_ipv6 ? "::1" : "127.0.0.1"
  end
end

.normalize_bind_host(host) ⇒ Object

Normalizes the bind host:

"*"                    → nil (dual-stack wildcard via AI_PASSIVE)
"" / nil / "localhost" → loopback_host (::1 on IPv6-capable hosts, else 127.0.0.1)
else                   → unchanged


69
70
71
72
73
74
75
# File 'lib/omq/transport/tcp.rb', line 69

def normalize_bind_host(host)
  case host
  when "*" then nil
  when nil, "", "localhost" then loopback_host
  else host
  end
end

.normalize_connect_host(host) ⇒ Object

Normalizes the connect host: “”, nil, “*”, and “localhost” all map to the loopback host. Everything else is passed through so real hostnames still go through the resolver + Happy Eyeballs.



82
83
84
85
86
87
# File 'lib/omq/transport/tcp.rb', line 82

def normalize_connect_host(host)
  case host
  when nil, "", "*", "localhost" then loopback_host
  else host
  end
end

.parse_endpoint(endpoint) ⇒ Array(String, Integer)

Parses a TCP endpoint URI into host and port.

Parameters:

  • endpoint (String)

Returns:

  • (Array(String, Integer))


121
122
123
124
# File 'lib/omq/transport/tcp.rb', line 121

def parse_endpoint(endpoint)
  uri = URI.parse(endpoint)
  [uri.hostname, uri.port]
end

.validate_endpoint!(endpoint) ⇒ void

This method returns an undefined value.

Validates that the endpoint’s host can be resolved.

Parameters:

  • endpoint (String)


42
43
44
45
46
# File 'lib/omq/transport/tcp.rb', line 42

def validate_endpoint!(endpoint)
  host, _port = parse_endpoint(endpoint)
  lookup_host = normalize_bind_host(host)
  Addrinfo.getaddrinfo(lookup_host, nil, nil, :STREAM) if lookup_host
end