Module: NNQ::Transport::ZstdTcp
- Defined in:
- lib/nnq/transport/zstd_tcp/transport.rb,
lib/nnq/transport/zstd_tcp/connection.rb
Overview
zstd+tcp — transport-layer Zstd compression over TCP.
URI scheme: ‘zstd+tcp://host:port`. Both peers must use this scheme; a plain tcp:// peer cannot talk to a zstd+tcp:// peer.
Compression is handled by a per-engine Zstd::Codec stored in a WeakKeyMap — all connections on one socket share a codec (critical for dict training across fan-in / fan-out).
Connection-layer wrapping happens in Engine::ConnectionLifecycle#ready! via the ZstdTcp.wrap_connection hook, so routing and recv pumps see a duck-typed Connection and need no changes.
Defined Under Namespace
Classes: Listener, ZstdConnection
Constant Summary collapse
- SCHEME =
"zstd+tcp"
Class Method Summary collapse
-
.bind(endpoint, engine, level: -3,, dict: nil) ⇒ Listener
Binds a zstd+tcp listener.
-
.connect(endpoint, engine, level: -3,, dict: nil) ⇒ void
Dials a zstd+tcp endpoint.
- .connect_timeout(options) ⇒ Object
- .parse_endpoint(endpoint) ⇒ Object
-
.wrap_connection(conn, engine) ⇒ ZstdConnection
Called by ConnectionLifecycle#ready! after the SP handshake completes.
Class Method Details
.bind(endpoint, engine, level: -3,, dict: nil) ⇒ Listener
Binds a zstd+tcp listener. Dials plain TCP underneath; the compression layer is applied per-connection via wrap_connection.
39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/nnq/transport/zstd_tcp/transport.rb', line 39 def bind(endpoint, engine, level: -3, dict: nil, **) codec_for(engine, level: level, dict: dict) host, port = parse_endpoint(endpoint) host = "0.0.0.0" if host == "*" server = TCPServer.new(host, port) actual = server.local_address.ip_port host_part = host.include?(":") ? "[#{host}]" : host Listener.new("#{SCHEME}://#{host_part}:#{actual}", server, actual, engine) end |
.connect(endpoint, engine, level: -3,, dict: nil) ⇒ void
This method returns an undefined value.
Dials a zstd+tcp endpoint. Non-blocking via engine’s reconnect loop — this is called synchronously on first connect and on each retry.
61 62 63 64 65 66 67 68 |
# File 'lib/nnq/transport/zstd_tcp/transport.rb', line 61 def connect(endpoint, engine, level: -3, dict: nil, **) codec_for(engine, level: level, dict: dict) host, port = parse_endpoint(endpoint) sock = ::Socket.tcp(host, port, connect_timeout: connect_timeout(engine.)) engine.handle_connected(IO::Stream::Buffered.wrap(sock), endpoint: endpoint) end |
.connect_timeout(options) ⇒ Object
88 89 90 91 92 |
# File 'lib/nnq/transport/zstd_tcp/transport.rb', line 88 def connect_timeout() ri = .reconnect_interval ri = ri.end if ri.is_a?(Range) [ri, 0.5].max end |
.parse_endpoint(endpoint) ⇒ Object
82 83 84 85 |
# File 'lib/nnq/transport/zstd_tcp/transport.rb', line 82 def parse_endpoint(endpoint) uri = URI.parse(endpoint.sub(/\A#{SCHEME}:/, "tcp:")) [uri.hostname, uri.port] end |
.wrap_connection(conn, engine) ⇒ ZstdConnection
Called by ConnectionLifecycle#ready! after the SP handshake completes. Returns a ZstdConnection wrapping conn.
77 78 79 |
# File 'lib/nnq/transport/zstd_tcp/transport.rb', line 77 def wrap_connection(conn, engine) ZstdConnection.new(conn, codec_for(engine)) end |