Module: OMQ::Transport::TCP
- Defined in:
- lib/omq/transport/tcp.rb
Overview
TCP transport using Ruby sockets with Async.
Defined Under Namespace
Class Method Summary collapse
-
.apply_buffer_sizes(sock, options) ⇒ Object
Applies SO_SNDBUF / SO_RCVBUF to
sockfrom the socket’s Options. -
.connect_timeout(options) ⇒ Object
Connect timeout: cap each attempt at the reconnect interval so a hung connect(2) (e.g. macOS kqueue + IPv6 ECONNREFUSED not delivered) doesn’t block the retry loop.
-
.connection_class ⇒ Class
ZMTP connection class used for TCP-accepted/dialed peers.
-
.dialer(endpoint, engine) ⇒ Dialer
Creates a TCP dialer for an endpoint.
-
.listener(endpoint, engine) ⇒ Listener
Creates a bound TCP listener.
-
.loopback_host ⇒ Object
Loopback address preference for bind/connect normalization.
-
.normalize_bind_host(host) ⇒ Object
Normalizes the bind host: “*” → nil (dual-stack wildcard via AI_PASSIVE) “” / nil / “localhost” → loopback_host (::1 on IPv6-capable hosts, else 127.0.0.1) else → unchanged.
-
.normalize_connect_host(host) ⇒ Object
Normalizes the connect host: “”, nil, “*”, and “localhost” all map to the loopback host.
-
.parse_endpoint(endpoint) ⇒ Array(String, Integer)
Parses a TCP endpoint URI into host and port.
-
.validate_endpoint!(endpoint) ⇒ void
Validates that the endpoint’s host can be resolved.
Class Method Details
.apply_buffer_sizes(sock, options) ⇒ Object
Applies SO_SNDBUF / SO_RCVBUF to sock from the socket’s Options. No-op when both are nil (OS default).
141 142 143 144 145 146 147 148 149 |
# File 'lib/omq/transport/tcp.rb', line 141 def apply_buffer_sizes(sock, ) if .sndbuf sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, .sndbuf) end if .rcvbuf sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, .rcvbuf) end end |
.connect_timeout(options) ⇒ Object
Connect timeout: cap each attempt at the reconnect interval so a hung connect(2) (e.g. macOS kqueue + IPv6 ECONNREFUSED not delivered) doesn’t block the retry loop. Floor at 0.5s for real-network latency.
117 118 119 120 121 |
# File 'lib/omq/transport/tcp.rb', line 117 def connect_timeout() ri = .reconnect_interval ri = ri.end if ri.is_a?(Range) [ri, 0.5].max end |
.connection_class ⇒ Class
ZMTP connection class used for TCP-accepted/dialed peers.
20 21 22 |
# File 'lib/omq/transport/tcp.rb', line 20 def connection_class Protocol::ZMTP::Connection end |
.dialer(endpoint, engine) ⇒ Dialer
Creates a TCP dialer for an endpoint.
55 56 57 |
# File 'lib/omq/transport/tcp.rb', line 55 def dialer(endpoint, engine, **) Dialer.new(endpoint, engine) end |
.listener(endpoint, engine) ⇒ Listener
Creates a bound TCP listener.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/omq/transport/tcp.rb', line 31 def listener(endpoint, engine, **) host, port = self.parse_endpoint(endpoint) lookup_host = normalize_bind_host(host) # Socket.tcp_server_sockets coordinates ephemeral ports across # address families and sets IPV6_V6ONLY so IPv4 and IPv6 # wildcards don't collide on Linux. servers = ::Socket.tcp_server_sockets(lookup_host, port) raise ::Socket::ResolutionError, "no addresses for #{host.inspect}" if servers.empty? actual_port = servers.first.local_address.ip_port display_host = host == "*" ? "*" : (lookup_host || "*") host_part = display_host.include?(":") ? "[#{display_host}]" : display_host resolved = "tcp://#{host_part}:#{actual_port}" Listener.new(resolved, servers, actual_port, engine) end |
.loopback_host ⇒ Object
Loopback address preference for bind/connect normalization. Returns “::1” when the host has at least one non-loopback, non-link-local IPv6 address, otherwise “127.0.0.1”.
102 103 104 105 106 107 108 109 110 |
# File 'lib/omq/transport/tcp.rb', line 102 def loopback_host @loopback_host ||= begin has_ipv6 = ::Socket.getifaddrs.any? do |ifa| addr = ifa.addr addr&.ipv6? && !addr.ipv6_loopback? && !addr.ipv6_linklocal? end has_ipv6 ? "::1" : "127.0.0.1" end end |
.normalize_bind_host(host) ⇒ Object
Normalizes the bind host:
"*" → nil (dual-stack wildcard via AI_PASSIVE)
"" / nil / "localhost" → loopback_host (::1 on IPv6-capable hosts, else 127.0.0.1)
else → unchanged
77 78 79 80 81 82 83 |
# File 'lib/omq/transport/tcp.rb', line 77 def normalize_bind_host(host) case host when "*" then nil when nil, "", "localhost" then loopback_host else host end end |
.normalize_connect_host(host) ⇒ Object
Normalizes the connect host: “”, nil, “*”, and “localhost” all map to the loopback host. Everything else is passed through so real hostnames still go through the resolver + Happy Eyeballs.
90 91 92 93 94 95 |
# File 'lib/omq/transport/tcp.rb', line 90 def normalize_connect_host(host) case host when nil, "", "*", "localhost" then loopback_host else host end end |
.parse_endpoint(endpoint) ⇒ Array(String, Integer)
Parses a TCP endpoint URI into host and port.
129 130 131 132 |
# File 'lib/omq/transport/tcp.rb', line 129 def parse_endpoint(endpoint) uri = URI.parse(endpoint) [uri.hostname, uri.port] end |
.validate_endpoint!(endpoint) ⇒ void
This method returns an undefined value.
Validates that the endpoint’s host can be resolved.
65 66 67 68 69 |
# File 'lib/omq/transport/tcp.rb', line 65 def validate_endpoint!(endpoint) host, _port = parse_endpoint(endpoint) lookup_host = normalize_bind_host(host) Addrinfo.getaddrinfo(lookup_host, nil, nil, :STREAM) if lookup_host end |