Module: OMQ::Transport::ZstdTcp

Defined in:
lib/omq/transport/zstd_tcp.rb,
lib/omq/transport/zstd_tcp/codec.rb,
lib/omq/transport/zstd_tcp/transport.rb,
lib/omq/transport/zstd_tcp/connection.rb

Defined Under Namespace

Classes: Codec, Dialer, Listener, ProtocolError, ZstdConnection

Constant Summary collapse

SCHEME =
"zstd+tcp"

Class Method Summary collapse

Class Method Details

.connect_timeout(options) ⇒ Object



87
88
89
90
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 87

def connect_timeout(options)
  ri = options.reconnect_interval
  ri.is_a?(Range) ? ri.end : [ri * 10, 30].min
end

.dialer(endpoint, engine, level: -3,, dict: nil) ⇒ Dialer

Creates a zstd+tcp dialer for an endpoint.

Parameters:

  • endpoint (String)

    e.g. “zstd+tcp://127.0.0.1:5555”

  • engine (Engine)
  • level (Integer) (defaults to: -3,)

    Zstd compression level

  • dict (String, nil) (defaults to: nil)

    user-supplied dictionary bytes

Returns:



57
58
59
60
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 57

def dialer(endpoint, engine, level: -3, dict: nil, **)
  codec = codec_for(engine, level: level, dict: dict)
  Dialer.new(endpoint, engine, codec)
end

.listener(endpoint, engine, level: -3,, dict: nil) ⇒ Listener

Creates a bound zstd+tcp listener.

Parameters:

  • endpoint (String)

    e.g. “zstd+tcp://127.0.0.1:5555”

  • engine (Engine)
  • level (Integer) (defaults to: -3,)

    Zstd compression level

  • dict (String, nil) (defaults to: nil)

    user-supplied dictionary bytes

Returns:



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 29

def listener(endpoint, engine, level: -3, dict: nil, **)
  codec = codec_for(engine, level: level, dict: dict)

  host, port = parse_endpoint(endpoint)
  host       = normalize_bind_host(host)
  servers    = ::Socket.tcp_server_sockets(host, port)

  if servers.empty?
    raise ::Socket::ResolutionError, "no addresses for #{host.inspect}"
  end

  actual_port  = servers.first.local_address.ip_port
  display_host = host || "*"
  host_part    = display_host.include?(":") ? "[#{display_host}]" : display_host
  resolved     = "#{SCHEME}://#{host_part}:#{actual_port}"

  Listener.new(resolved, servers, actual_port, engine, codec)
end

.normalize_bind_host(host) ⇒ Object



76
77
78
79
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 76

def normalize_bind_host(host)
  return nil if host == "*"
  host
end

.normalize_connect_host(host) ⇒ Object



82
83
84
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 82

def normalize_connect_host(host)
  host == "*" ? "127.0.0.1" : host
end

.parse_endpoint(endpoint) ⇒ Object



70
71
72
73
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 70

def parse_endpoint(endpoint)
  uri = URI.parse(endpoint)
  [uri.hostname, uri.port]
end

.validate_endpoint!(endpoint) ⇒ Object



63
64
65
66
67
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 63

def validate_endpoint!(endpoint)
  host, _port = parse_endpoint(endpoint)
  host = normalize_connect_host(host)
  Addrinfo.getaddrinfo(host, nil, nil, :STREAM) if host
end