Module: OMQ::Transport::TCP
- Defined in:
- lib/omq/transport/tcp.rb
Overview
TCP transport using Ruby sockets with Async.
Defined Under Namespace
Classes: Listener
Class Method Summary collapse
-
.apply_buffer_sizes(sock, options) ⇒ Object
Applies SO_SNDBUF / SO_RCVBUF to
sockfrom the socket’s Options. -
.bind(endpoint, engine) ⇒ Listener
Binds a TCP server.
-
.connect(endpoint, engine) ⇒ void
Connects to a TCP endpoint.
-
.connect_timeout(options) ⇒ Object
Connect timeout: cap each attempt at the reconnect interval so a hung connect(2) (e.g. macOS kqueue + IPv6 ECONNREFUSED not delivered) doesn’t block the retry loop.
-
.loopback_host ⇒ Object
Loopback address preference for bind/connect normalization.
-
.normalize_bind_host(host) ⇒ Object
Normalizes the bind host: “*” → nil (dual-stack wildcard via AI_PASSIVE) “” / nil / “localhost” → loopback_host (::1 on IPv6-capable hosts, else 127.0.0.1) else → unchanged.
-
.normalize_connect_host(host) ⇒ Object
Normalizes the connect host: “”, nil, “*”, and “localhost” all map to the loopback host.
-
.parse_endpoint(endpoint) ⇒ Array(String, Integer)
Parses a TCP endpoint URI into host and port.
-
.validate_endpoint!(endpoint) ⇒ void
Validates that the endpoint’s host can be resolved.
Class Method Details
.apply_buffer_sizes(sock, options) ⇒ Object
Applies SO_SNDBUF / SO_RCVBUF to sock from the socket’s Options. No-op when both are nil (OS default).
133 134 135 136 137 138 139 140 141 |
# File 'lib/omq/transport/tcp.rb', line 133 def apply_buffer_sizes(sock, ) if .sndbuf sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, .sndbuf) end if .rcvbuf sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, .rcvbuf) end end |
.bind(endpoint, engine) ⇒ Listener
Binds a TCP server.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/omq/transport/tcp.rb', line 19 def bind(endpoint, engine) host, port = self.parse_endpoint(endpoint) lookup_host = normalize_bind_host(host) # Socket.tcp_server_sockets coordinates ephemeral ports across # address families and sets IPV6_V6ONLY so IPv4 and IPv6 # wildcards don't collide on Linux. servers = ::Socket.tcp_server_sockets(lookup_host, port) raise ::Socket::ResolutionError, "no addresses for #{host.inspect}" if servers.empty? actual_port = servers.first.local_address.ip_port display_host = host == "*" ? "*" : (lookup_host || "*") host_part = display_host.include?(":") ? "[#{display_host}]" : display_host resolved = "tcp://#{host_part}:#{actual_port}" Listener.new(resolved, servers, actual_port, engine) end |
.connect(endpoint, engine) ⇒ void
This method returns an undefined value.
Connects to a TCP endpoint.
55 56 57 58 59 60 61 |
# File 'lib/omq/transport/tcp.rb', line 55 def connect(endpoint, engine) host, port = self.parse_endpoint(endpoint) host = normalize_connect_host(host) sock = ::Socket.tcp(host, port, connect_timeout: connect_timeout(engine.)) apply_buffer_sizes(sock, engine.) engine.handle_connected(IO::Stream::Buffered.wrap(sock), endpoint: endpoint) end |
.connect_timeout(options) ⇒ Object
Connect timeout: cap each attempt at the reconnect interval so a hung connect(2) (e.g. macOS kqueue + IPv6 ECONNREFUSED not delivered) doesn’t block the retry loop. Floor at 0.5s for real-network latency.
109 110 111 112 113 |
# File 'lib/omq/transport/tcp.rb', line 109 def connect_timeout() ri = .reconnect_interval ri = ri.end if ri.is_a?(Range) [ri, 0.5].max end |
.loopback_host ⇒ Object
Loopback address preference for bind/connect normalization. Returns “::1” when the host has at least one non-loopback, non-link-local IPv6 address, otherwise “127.0.0.1”.
94 95 96 97 98 99 100 101 102 |
# File 'lib/omq/transport/tcp.rb', line 94 def loopback_host @loopback_host ||= begin has_ipv6 = ::Socket.getifaddrs.any? do |ifa| addr = ifa.addr addr&.ipv6? && !addr.ipv6_loopback? && !addr.ipv6_linklocal? end has_ipv6 ? "::1" : "127.0.0.1" end end |
.normalize_bind_host(host) ⇒ Object
Normalizes the bind host:
"*" → nil (dual-stack wildcard via AI_PASSIVE)
"" / nil / "localhost" → loopback_host (::1 on IPv6-capable hosts, else 127.0.0.1)
else → unchanged
69 70 71 72 73 74 75 |
# File 'lib/omq/transport/tcp.rb', line 69 def normalize_bind_host(host) case host when "*" then nil when nil, "", "localhost" then loopback_host else host end end |
.normalize_connect_host(host) ⇒ Object
Normalizes the connect host: “”, nil, “*”, and “localhost” all map to the loopback host. Everything else is passed through so real hostnames still go through the resolver + Happy Eyeballs.
82 83 84 85 86 87 |
# File 'lib/omq/transport/tcp.rb', line 82 def normalize_connect_host(host) case host when nil, "", "*", "localhost" then loopback_host else host end end |
.parse_endpoint(endpoint) ⇒ Array(String, Integer)
Parses a TCP endpoint URI into host and port.
121 122 123 124 |
# File 'lib/omq/transport/tcp.rb', line 121 def parse_endpoint(endpoint) uri = URI.parse(endpoint) [uri.hostname, uri.port] end |
.validate_endpoint!(endpoint) ⇒ void
This method returns an undefined value.
Validates that the endpoint’s host can be resolved.
42 43 44 45 46 |
# File 'lib/omq/transport/tcp.rb', line 42 def validate_endpoint!(endpoint) host, _port = parse_endpoint(endpoint) lookup_host = normalize_bind_host(host) Addrinfo.getaddrinfo(lookup_host, nil, nil, :STREAM) if lookup_host end |