Module: OMQ::Transport::Lz4Tcp

Defined in:
lib/omq/transport/lz4_tcp/transport.rb,
lib/omq/transport/lz4_tcp/connection.rb

Defined Under Namespace

Classes: Dialer, Listener, Lz4Connection

Constant Summary collapse

SCHEME =
"lz4+tcp"

Class Method Summary collapse

Class Method Details

.connect_timeout(options) ⇒ Object



95
96
97
98
99
# File 'lib/omq/transport/lz4_tcp/transport.rb', line 95

def connect_timeout(options)
  ri = options.reconnect_interval
  ri = ri.end if ri.is_a?(Range)
  [ri, 0.5].max
end

.connection_classObject

Called by OMQ::Engine::ConnectionLifecycle after the ZMTP handshake completes; we return the default connection class so that handshake itself runs uncompressed over raw TCP.



19
20
21
# File 'lib/omq/transport/lz4_tcp/transport.rb', line 19

def connection_class
  Protocol::ZMTP::Connection
end

.dialer(endpoint, engine, dict: nil) ⇒ Dialer

Creates an lz4+tcp dialer for an endpoint.

Parameters:

  • endpoint (String)
  • engine (OMQ::Engine)
  • dict (String, nil) (defaults to: nil)

    user-supplied dictionary bytes.

Returns:



59
60
61
62
# File 'lib/omq/transport/lz4_tcp/transport.rb', line 59

def dialer(endpoint, engine, dict: nil, **)
  validate_dict!(dict)
  Dialer.new(endpoint, engine, dict&.b)
end

.listener(endpoint, engine, dict: nil) ⇒ Listener

Creates a bound lz4+tcp listener.

Parameters:

  • endpoint (String)

    e.g. “lz4+tcp://127.0.0.1:0”

  • engine (OMQ::Engine)
  • dict (String, nil) (defaults to: nil)

    user-supplied dictionary bytes to ship on the first outgoing message. If nil, no dict is shipped (payloads compress without dict or go plaintext below the min-size threshold).

Returns:



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/omq/transport/lz4_tcp/transport.rb', line 33

def listener(endpoint, engine, dict: nil, **)
  validate_dict!(dict)

  host, port  = parse_endpoint(endpoint)
  lookup_host = normalize_bind_host(host)
  servers     = ::Socket.tcp_server_sockets(lookup_host, port)

  if servers.empty?
    raise ::Socket::ResolutionError, "no addresses for #{host.inspect}"
  end

  actual_port  = servers.first.local_address.ip_port
  display_host = host == "*" ? "*" : (lookup_host || "*")
  host_part    = display_host.include?(":") ? "[#{display_host}]" : display_host
  resolved     = "#{SCHEME}://#{host_part}:#{actual_port}"

  Listener.new(resolved, servers, actual_port, engine, dict&.b)
end

.normalize_bind_host(host) ⇒ Object



78
79
80
81
82
83
84
# File 'lib/omq/transport/lz4_tcp/transport.rb', line 78

def normalize_bind_host(host)
  case host
  when "*" then nil
  when nil, "", "localhost" then TCP.loopback_host
  else host
  end
end

.normalize_connect_host(host) ⇒ Object



87
88
89
90
91
92
# File 'lib/omq/transport/lz4_tcp/transport.rb', line 87

def normalize_connect_host(host)
  case host
  when nil, "", "*", "localhost" then TCP.loopback_host
  else host
  end
end

.parse_endpoint(endpoint) ⇒ Object



72
73
74
75
# File 'lib/omq/transport/lz4_tcp/transport.rb', line 72

def parse_endpoint(endpoint)
  uri = URI.parse(endpoint)
  [uri.hostname, uri.port]
end

.validate_endpoint!(endpoint) ⇒ Object



65
66
67
68
69
# File 'lib/omq/transport/lz4_tcp/transport.rb', line 65

def validate_endpoint!(endpoint)
  host, _port = parse_endpoint(endpoint)
  lookup_host = normalize_connect_host(host)
  Addrinfo.getaddrinfo(lookup_host, nil, nil, :STREAM) if lookup_host
end