Module: NNQ::Transport::TCP
- Defined in:
- lib/nnq/transport/tcp.rb
Overview
TCP transport. Smaller than omq’s: no IPv6 dual-bind dance, no custom buffer-size sockopts (yet). One server per bind, blocking accept inside an Async fiber.
Defined Under Namespace
Classes: Listener
Class Method Summary collapse
-
.bind(endpoint, engine) ⇒ Listener
Binds a TCP server to
endpoint. -
.connect(endpoint, engine) ⇒ void
Connects to
endpointand registers the resulting pipe with the engine. -
.connect_timeout(options) ⇒ Object
Connect timeout: cap each attempt at the reconnect interval so a hung connect(2) (e.g. macOS kqueue + IPv6 ECONNREFUSED not delivered) doesn’t block the retry loop.
- .parse_endpoint(endpoint) ⇒ Object
Class Method Details
.bind(endpoint, engine) ⇒ Listener
Binds a TCP server to endpoint.
20 21 22 23 24 25 26 27 28 |
# File 'lib/nnq/transport/tcp.rb', line 20 def bind(endpoint, engine) host, port = parse_endpoint(endpoint) host = "0.0.0.0" if host == "*" server = TCPServer.new(host, port) actual = server.local_address.ip_port host_part = host.include?(":") ? "[#{host}]" : host Listener.new("tcp://#{host_part}:#{actual}", server, actual, engine) end |
.connect(endpoint, engine) ⇒ void
This method returns an undefined value.
Connects to endpoint and registers the resulting pipe with the engine. Synchronous (errors propagate to the caller).
37 38 39 40 41 42 |
# File 'lib/nnq/transport/tcp.rb', line 37 def connect(endpoint, engine) host, port = parse_endpoint(endpoint) sock = ::Socket.tcp(host, port, connect_timeout: connect_timeout(engine.)) engine.handle_connected(IO::Stream::Buffered.wrap(sock), endpoint: endpoint) end |
.connect_timeout(options) ⇒ Object
Connect timeout: cap each attempt at the reconnect interval so a hung connect(2) (e.g. macOS kqueue + IPv6 ECONNREFUSED not delivered) doesn’t block the retry loop. Floor at 0.5s for real-network latency.
49 50 51 52 53 |
# File 'lib/nnq/transport/tcp.rb', line 49 def connect_timeout() ri = .reconnect_interval ri = ri.end if ri.is_a?(Range) [ri, 0.5].max end |
.parse_endpoint(endpoint) ⇒ Object
56 57 58 59 |
# File 'lib/nnq/transport/tcp.rb', line 56 def parse_endpoint(endpoint) uri = URI.parse(endpoint) [uri.hostname, uri.port] end |