Module: OMQ::Transport::ZstdTcp
- Defined in:
- lib/omq/transport/zstd_tcp.rb,
lib/omq/transport/zstd_tcp/codec.rb,
lib/omq/transport/zstd_tcp/transport.rb,
lib/omq/transport/zstd_tcp/connection.rb
Defined Under Namespace
Classes: Codec, Dialer, Listener, ProtocolError, ZstdConnection
Constant Summary
collapse
- SCHEME =
"zstd+tcp"
Class Method Summary
collapse
Class Method Details
.connect_timeout(options) ⇒ Object
87
88
89
90
|
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 87
def connect_timeout(options)
ri = options.reconnect_interval
ri.is_a?(Range) ? ri.end : [ri * 10, 30].min
end
|
.dialer(endpoint, engine, level: -3,, dict: nil) ⇒ Dialer
Creates a zstd+tcp dialer for an endpoint.
57
58
59
60
|
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 57
def dialer(endpoint, engine, level: -3, dict: nil, **)
codec = codec_for(engine, level: level, dict: dict)
Dialer.new(endpoint, engine, codec)
end
|
.listener(endpoint, engine, level: -3,, dict: nil) ⇒ Listener
Creates a bound zstd+tcp listener.
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 29
def listener(endpoint, engine, level: -3, dict: nil, **)
codec = codec_for(engine, level: level, dict: dict)
host, port = parse_endpoint(endpoint)
host = normalize_bind_host(host)
servers = ::Socket.tcp_server_sockets(host, port)
if servers.empty?
raise ::Socket::ResolutionError, "no addresses for #{host.inspect}"
end
actual_port = servers.first.local_address.ip_port
display_host = host || "*"
host_part = display_host.include?(":") ? "[#{display_host}]" : display_host
resolved = "#{SCHEME}://#{host_part}:#{actual_port}"
Listener.new(resolved, servers, actual_port, engine, codec)
end
|
.normalize_bind_host(host) ⇒ Object
76
77
78
79
|
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 76
def normalize_bind_host(host)
return nil if host == "*"
host
end
|
.normalize_connect_host(host) ⇒ Object
82
83
84
|
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 82
def normalize_connect_host(host)
host == "*" ? "127.0.0.1" : host
end
|
.parse_endpoint(endpoint) ⇒ Object
70
71
72
73
|
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 70
def parse_endpoint(endpoint)
uri = URI.parse(endpoint)
[uri.hostname, uri.port]
end
|
.validate_endpoint!(endpoint) ⇒ Object
63
64
65
66
67
|
# File 'lib/omq/transport/zstd_tcp/transport.rb', line 63
def validate_endpoint!(endpoint)
host, _port = parse_endpoint(endpoint)
host = normalize_connect_host(host)
Addrinfo.getaddrinfo(host, nil, nil, :STREAM) if host
end
|