Class: OMQ::Transport::ZstdTcp::Dialer

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/transport/zstd_tcp/transport.rb

Overview

A zstd+tcp dialer — stateful factory for outgoing connections.

Holds the Codec (compression cache, training state, dict) and wraps new connections with ZstdConnection.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(endpoint, engine, codec) ⇒ Dialer

Returns a new instance of Dialer.

Parameters:

  • endpoint (String)
  • engine (Engine)
  • codec (Codec)


136
137
138
139
140
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 136

def initialize(endpoint, engine, codec)
  @endpoint = endpoint
  @engine   = engine
  @codec    = codec
end

Instance Attribute Details

#endpointString (readonly)

Returns the endpoint this dialer connects to.

Returns:

  • (String)

    the endpoint this dialer connects to



129
130
131
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 129

def endpoint
  @endpoint
end

Instance Method Details

#connectvoid

This method returns an undefined value.

Establishes a TCP connection to the endpoint.



147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 147

def connect
  host, port = ZstdTcp.parse_endpoint(@endpoint)
  host       = ZstdTcp.normalize_connect_host(host)
  sock       = ::Socket.tcp host, port, connect_timeout: ZstdTcp.connect_timeout(@engine.options)

  TCP.apply_buffer_sizes sock, @engine.options

  @engine.handle_connected IO::Stream::Buffered.wrap(sock), endpoint: @endpoint
rescue
  sock&.close
  raise
end

#wrap_connection(conn) ⇒ ZstdConnection

Wraps a raw ZMTP connection with Zstd compression.

Parameters:

  • conn (Protocol::ZMTP::Connection)

Returns:



166
167
168
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 166

def wrap_connection(conn)
  ZstdConnection.new(conn, @codec)
end