Module: OMQ::Transport::Lz4Tcp
- Defined in:
- lib/omq/transport/lz4_tcp/transport.rb,
lib/omq/transport/lz4_tcp/connection.rb
Defined Under Namespace
Classes: Dialer, Listener, Lz4Connection
Constant Summary
collapse
- SCHEME =
"lz4+tcp"
Class Method Summary
collapse
Class Method Details
.connect_timeout(options) ⇒ Object
95
96
97
98
99
|
# File 'lib/omq/transport/lz4_tcp/transport.rb', line 95
def connect_timeout(options)
ri = options.reconnect_interval
ri = ri.end if ri.is_a?(Range)
[ri, 0.5].max
end
|
.connection_class ⇒ Object
Called by OMQ::Engine::ConnectionLifecycle after the ZMTP handshake completes; we return the default connection class so that handshake itself runs uncompressed over raw TCP.
19
20
21
|
# File 'lib/omq/transport/lz4_tcp/transport.rb', line 19
def connection_class
Protocol::ZMTP::Connection
end
|
.dialer(endpoint, engine, dict: nil) ⇒ Dialer
Creates an lz4+tcp dialer for an endpoint.
59
60
61
62
|
# File 'lib/omq/transport/lz4_tcp/transport.rb', line 59
def dialer(endpoint, engine, dict: nil, **)
validate_dict!(dict)
Dialer.new(endpoint, engine, dict&.b)
end
|
.listener(endpoint, engine, dict: nil) ⇒ Listener
Creates a bound lz4+tcp listener.
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
# File 'lib/omq/transport/lz4_tcp/transport.rb', line 33
def listener(endpoint, engine, dict: nil, **)
validate_dict!(dict)
host, port = parse_endpoint(endpoint)
lookup_host = normalize_bind_host(host)
servers = ::Socket.tcp_server_sockets(lookup_host, port)
if servers.empty?
raise ::Socket::ResolutionError, "no addresses for #{host.inspect}"
end
actual_port = servers.first.local_address.ip_port
display_host = host == "*" ? "*" : (lookup_host || "*")
host_part = display_host.include?(":") ? "[#{display_host}]" : display_host
resolved = "#{SCHEME}://#{host_part}:#{actual_port}"
Listener.new(resolved, servers, actual_port, engine, dict&.b)
end
|
.normalize_bind_host(host) ⇒ Object
78
79
80
81
82
83
84
|
# File 'lib/omq/transport/lz4_tcp/transport.rb', line 78
def normalize_bind_host(host)
case host
when "*" then nil
when nil, "", "localhost" then TCP.loopback_host
else host
end
end
|
.normalize_connect_host(host) ⇒ Object
87
88
89
90
91
92
|
# File 'lib/omq/transport/lz4_tcp/transport.rb', line 87
def normalize_connect_host(host)
case host
when nil, "", "*", "localhost" then TCP.loopback_host
else host
end
end
|
.parse_endpoint(endpoint) ⇒ Object
72
73
74
75
|
# File 'lib/omq/transport/lz4_tcp/transport.rb', line 72
def parse_endpoint(endpoint)
uri = URI.parse(endpoint)
[uri.hostname, uri.port]
end
|
.validate_endpoint!(endpoint) ⇒ Object
65
66
67
68
69
|
# File 'lib/omq/transport/lz4_tcp/transport.rb', line 65
def validate_endpoint!(endpoint)
host, _port = parse_endpoint(endpoint)
lookup_host = normalize_connect_host(host)
Addrinfo.getaddrinfo(lookup_host, nil, nil, :STREAM) if lookup_host
end
|