Module: NNQ::Transport::ZstdTcp

Defined in:
lib/nnq/transport/zstd_tcp/transport.rb,
lib/nnq/transport/zstd_tcp/connection.rb

Overview

zstd+tcp — transport-layer Zstd compression over TCP.

URI scheme: ‘zstd+tcp://host:port`. Both peers must use this scheme; a plain tcp:// peer cannot talk to a zstd+tcp:// peer.

Compression is handled by a per-engine Zstd::Codec stored in a WeakKeyMap — all connections on one socket share a codec (critical for dict training across fan-in / fan-out).

Connection-layer wrapping happens in Engine::ConnectionLifecycle#ready! via the ZstdTcp.wrap_connection hook, so routing and recv pumps see a duck-typed Connection and need no changes.

Defined Under Namespace

Classes: Listener, ZstdConnection

Constant Summary collapse

SCHEME =
"zstd+tcp"

Class Method Summary collapse

Class Method Details

.bind(endpoint, engine, level: -3,, dict: nil) ⇒ Listener

Binds a zstd+tcp listener. Dials plain TCP underneath; the compression layer is applied per-connection via wrap_connection.

Parameters:

  • endpoint (String)

    e.g. “zstd+tcp://127.0.0.1:0”

  • engine (NNQ::Engine)
  • level (Integer) (defaults to: -3,)

    Zstd compression level (default -3)

  • dict (String, nil) (defaults to: nil)

    pre-built dictionary bytes

Returns:



39
40
41
42
43
44
45
46
47
48
49
# File 'lib/nnq/transport/zstd_tcp/transport.rb', line 39

def bind(endpoint, engine, level: -3, dict: nil, **)
  codec_for(engine, level: level, dict: dict)

  host, port = parse_endpoint(endpoint)
  host       = "0.0.0.0" if host == "*"
  server     = TCPServer.new(host, port)
  actual     = server.local_address.ip_port
  host_part  = host.include?(":") ? "[#{host}]" : host

  Listener.new("#{SCHEME}://#{host_part}:#{actual}", server, actual, engine)
end

.connect(endpoint, engine, level: -3,, dict: nil) ⇒ void

This method returns an undefined value.

Dials a zstd+tcp endpoint. Non-blocking via engine’s reconnect loop — this is called synchronously on first connect and on each retry.

Parameters:

  • endpoint (String)
  • engine (NNQ::Engine)
  • level (Integer) (defaults to: -3,)
  • dict (String, nil) (defaults to: nil)


61
62
63
64
65
66
67
68
# File 'lib/nnq/transport/zstd_tcp/transport.rb', line 61

def connect(endpoint, engine, level: -3, dict: nil, **)
  codec_for(engine, level: level, dict: dict)

  host, port = parse_endpoint(endpoint)
  sock       = ::Socket.tcp(host, port, connect_timeout: connect_timeout(engine.options))

  engine.handle_connected(IO::Stream::Buffered.wrap(sock), endpoint: endpoint)
end

.connect_timeout(options) ⇒ Object



88
89
90
91
92
# File 'lib/nnq/transport/zstd_tcp/transport.rb', line 88

def connect_timeout(options)
  ri = options.reconnect_interval
  ri = ri.end if ri.is_a?(Range)
  [ri, 0.5].max
end

.parse_endpoint(endpoint) ⇒ Object



82
83
84
85
# File 'lib/nnq/transport/zstd_tcp/transport.rb', line 82

def parse_endpoint(endpoint)
  uri = URI.parse(endpoint.sub(/\A#{SCHEME}:/, "tcp:"))
  [uri.hostname, uri.port]
end

.wrap_connection(conn, engine) ⇒ ZstdConnection

Called by ConnectionLifecycle#ready! after the SP handshake completes. Returns a ZstdConnection wrapping conn.

Parameters:

  • conn (NNQ::Connection)
  • engine (NNQ::Engine)

Returns:



77
78
79
# File 'lib/nnq/transport/zstd_tcp/transport.rb', line 77

def wrap_connection(conn, engine)
  ZstdConnection.new(conn, codec_for(engine))
end