Module: NNQ::Transport::TCP

Defined in:
lib/nnq/transport/tcp.rb

Overview

TCP transport. Smaller than omq’s: no IPv6 dual-bind dance, no custom buffer-size sockopts (yet). One server per bind, blocking accept inside an Async fiber.

Defined Under Namespace

Classes: Listener

Class Method Summary collapse

Class Method Details

.bind(endpoint, engine) ⇒ Listener

Binds a TCP server to endpoint.

Parameters:

  • endpoint (String)

    e.g. “tcp://127.0.0.1:5570” or “tcp://127.0.0.1:0”

  • engine (Engine)

Returns:



20
21
22
23
24
25
26
27
28
# File 'lib/nnq/transport/tcp.rb', line 20

def bind(endpoint, engine)
  host, port = parse_endpoint(endpoint)
  host       = "0.0.0.0" if host == "*"
  server     = TCPServer.new(host, port)
  actual     = server.local_address.ip_port
  host_part  = host.include?(":") ? "[#{host}]" : host

  Listener.new("tcp://#{host_part}:#{actual}", server, actual, engine)
end

.connect(endpoint, engine) ⇒ void

This method returns an undefined value.

Connects to endpoint and registers the resulting pipe with the engine. Synchronous (errors propagate to the caller).

Parameters:

  • endpoint (String)
  • engine (Engine)


37
38
39
40
41
42
# File 'lib/nnq/transport/tcp.rb', line 37

def connect(endpoint, engine)
  host, port = parse_endpoint(endpoint)
  sock       = ::Socket.tcp(host, port, connect_timeout: connect_timeout(engine.options))

  engine.handle_connected(IO::Stream::Buffered.wrap(sock), endpoint: endpoint)
end

.connect_timeout(options) ⇒ Object

Connect timeout: cap each attempt at the reconnect interval so a hung connect(2) (e.g. macOS kqueue + IPv6 ECONNREFUSED not delivered) doesn’t block the retry loop. Floor at 0.5s for real-network latency.



49
50
51
52
53
# File 'lib/nnq/transport/tcp.rb', line 49

def connect_timeout(options)
  ri = options.reconnect_interval
  ri = ri.end if ri.is_a?(Range)
  [ri, 0.5].max
end

.parse_endpoint(endpoint) ⇒ Object



56
57
58
59
# File 'lib/nnq/transport/tcp.rb', line 56

def parse_endpoint(endpoint)
  uri = URI.parse(endpoint)
  [uri.hostname, uri.port]
end