Module: OMQ::CLI::SocketSetup
- Defined in:
- lib/omq/cli/socket_setup.rb
Overview
Stateless helper for socket construction and configuration. All methods are module-level so callers compose rather than inherit.
Constant Summary collapse
- DEFAULT_HWM =
Default high water mark applied when the user does not pass –hwm. Lower than libzmq’s default (1000) to keep memory footprint bounded for the typical CLI use cases (interactive debugging, short-lived pipelines). 64 matches the recv pump’s per-fairness-batch limit (one batch exactly fills a full queue).
64- DEFAULT_RECV_MAXSZ =
Default max inbound message size applied when the user does not pass –recv-maxsz. The omq library itself is unlimited by default; the CLI caps inbound messages at 1 MiB so that a misconfigured or malicious peer cannot force arbitrary memory allocation on a terminal user. Users can raise it with –recv-maxsz N, or disable it entirely with –recv-maxsz 0.
1 << 20
- PURE_SEND_TYPES =
Socket types that never receive application data. These opt out of the default passive-compression behavior – they would otherwise advertise a profile and wrap every outgoing frame in a 4-byte uncompressed sentinel for no benefit, since there’s nothing to decompress in return.
%w[push pub scatter radio].freeze
Class Method Summary collapse
-
.apply_compression(sock, config, type_name) ⇒ Object
Installs ZMTP-Zstd compression on
sockbased ontype_nameand the explicit flags inconfig. -
.apply_options(sock, config) ⇒ Object
Apply common socket options from
configtosock. -
.apply_recv_maxsz(sock, config) ⇒ Object
–recv-maxsz: nil → 1 MiB default; 0 → explicitly unlimited; else → as-is.
-
.attach(sock, config, verbose: 0, timestamps: nil) ⇒ Object
Bind/connect
sockusing URL strings fromconfig.binds/config.connects. -
.attach_endpoints(sock, endpoints, verbose: 0, timestamps: nil) ⇒ Object
Bind/connect
sockfrom an Array of Endpoint objects. -
.build(klass, config) ⇒ Object
Create and fully configure a socket from
klassandconfig. -
.kill_on_protocol_error(sock, event) ⇒ Object
CLI-level policy: a peer that commits a protocol-level violation (Protocol::ZMTP::Error — oversized frame, decompression bytebomb, bad framing, …) is almost certainly a misconfiguration the user needs to see.
-
.setup_curve(sock, config) ⇒ Object
Configure CURVE encryption on
sockusingconfigand env vars. -
.setup_subscriptions(sock, config) ⇒ Object
Subscribe or join groups on
sockaccording toconfig.
Class Method Details
.apply_compression(sock, config, type_name) ⇒ Object
Installs ZMTP-Zstd compression on sock based on type_name and the explicit flags in config. Three outcomes:
-
config.compressis true –> active compression (auto-dict). Outgoing frames are compressed; incoming are decoded. -
config.compressis false andtype_namecan receive –> passive compression (RFC Sec. 6.4). The socket advertises the profile so an active peer can compress on the wire and we can decode it, but we never compress our own outgoing frames. -
config.compressis false andtype_nameis inPURE_SEND_TYPES–> no compression. Pure senders have no incoming traffic to decompress, so passive mode is pure overhead on outgoing.
Callers pass type_name explicitly (rather than reading it off config) so the pipe runners can install different modes on their push/pull ends of a single pipe.
50 51 52 53 54 55 56 |
# File 'lib/omq/cli/socket_setup.rb', line 50 def self.apply_compression(sock, config, type_name) if config.compress sock.compression = OMQ::Compression::Zstd.auto(level: config.compress_level || -3) elsif !PURE_SEND_TYPES.include?(type_name) sock.compression = OMQ::Compression::Zstd.auto(passive: true) end end |
.apply_options(sock, config) ⇒ Object
Apply common socket options from config to sock.
61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/omq/cli/socket_setup.rb', line 61 def self.(sock, config) sock.linger = config.linger sock.recv_timeout = config.timeout if config.timeout sock.send_timeout = config.timeout if config.timeout sock.reconnect_interval = config.reconnect_ivl if config.reconnect_ivl sock.heartbeat_interval = config.heartbeat_ivl if config.heartbeat_ivl # nil → default; 0 stays 0 (unbounded), any other integer is taken as-is. sock.send_hwm = config.send_hwm || DEFAULT_HWM sock.recv_hwm = config.recv_hwm || DEFAULT_HWM sock.sndbuf = config.sndbuf if config.sndbuf sock.rcvbuf = config.rcvbuf if config.rcvbuf end |
.apply_recv_maxsz(sock, config) ⇒ Object
–recv-maxsz: nil → 1 MiB default; 0 → explicitly unlimited; else → as-is.
90 91 92 93 94 95 96 97 |
# File 'lib/omq/cli/socket_setup.rb', line 90 def self.apply_recv_maxsz(sock, config) sock. = case config.recv_maxsz when nil then DEFAULT_RECV_MAXSZ when 0 then nil else config.recv_maxsz end end |
.attach(sock, config, verbose: 0, timestamps: nil) ⇒ Object
Bind/connect sock using URL strings from config.binds / config.connects. verbose gates logging (>= 1), timestamps controls prefix.
103 104 105 106 107 108 109 110 111 112 |
# File 'lib/omq/cli/socket_setup.rb', line 103 def self.attach(sock, config, verbose: 0, timestamps: nil) config.binds.each do |url| sock.bind(url) CLI::Term.write_attach(:bind, sock.last_endpoint, ) if verbose >= 1 end config.connects.each do |url| sock.connect(url) CLI::Term.write_attach(:connect, url, ) if verbose >= 1 end end |
.attach_endpoints(sock, endpoints, verbose: 0, timestamps: nil) ⇒ Object
Bind/connect sock from an Array of Endpoint objects. Used by PipeRunner, which works with structured endpoint lists. verbose gates logging (>= 1), timestamps controls prefix.
119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/omq/cli/socket_setup.rb', line 119 def self.attach_endpoints(sock, endpoints, verbose: 0, timestamps: nil) endpoints.each do |ep| if ep.bind? sock.bind(ep.url) CLI::Term.write_attach(:bind, sock.last_endpoint, ) if verbose >= 1 else sock.connect(ep.url) CLI::Term.write_attach(:connect, ep.url, ) if verbose >= 1 end end end |
.build(klass, config) ⇒ Object
Create and fully configure a socket from klass and config.
77 78 79 80 81 82 83 84 85 86 |
# File 'lib/omq/cli/socket_setup.rb', line 77 def self.build(klass, config) sock = config.ffi ? klass.new(backend: :ffi) : klass.new sock.conflate = true if config.conflate && %w[pub radio].include?(config.type_name) (sock, config) apply_recv_maxsz(sock, config) sock.identity = config.identity if config.identity sock.router_mandatory = true if config.type_name == "router" apply_compression(sock, config, config.type_name) sock end |
.kill_on_protocol_error(sock, event) ⇒ Object
CLI-level policy: a peer that commits a protocol-level violation (Protocol::ZMTP::Error — oversized frame, decompression bytebomb, bad framing, …) is almost certainly a misconfiguration the user needs to see. Mark sock dead so the next receive raises SocketDeadError. The library itself just drops the connection and keeps serving the others; this stricter policy is CLI-only.
194 195 196 197 198 199 |
# File 'lib/omq/cli/socket_setup.rb', line 194 def self.kill_on_protocol_error(sock, event) return unless event.type == :disconnected error = event.detail && event.detail[:error] return unless error.is_a?(Protocol::ZMTP::Error) sock.engine.signal_fatal_error(error) end |
.setup_curve(sock, config) ⇒ Object
Configure CURVE encryption on sock using config and env vars.
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/omq/cli/socket_setup.rb', line 147 def self.setup_curve(sock, config) server_key_z85 = config.curve_server_key || ENV["OMQ_SERVER_KEY"] server_mode = config.curve_server || (ENV["OMQ_SERVER_PUBLIC"] && ENV["OMQ_SERVER_SECRET"]) return unless server_key_z85 || server_mode crypto = CLI.load_curve_crypto(config.crypto || ENV["OMQ_CRYPTO"], verbose: config.verbose >= 1) require "protocol/zmtp/mechanism/curve" if server_key_z85 server_key = Protocol::ZMTP::Z85.decode(server_key_z85) client_key = crypto::PrivateKey.generate sock.mechanism = Protocol::ZMTP::Mechanism::Curve.client( public_key: client_key.public_key.to_s, secret_key: client_key.to_s, server_key: server_key, crypto: crypto ) elsif server_mode if ENV["OMQ_SERVER_PUBLIC"] && ENV["OMQ_SERVER_SECRET"] server_pub = Protocol::ZMTP::Z85.decode(ENV["OMQ_SERVER_PUBLIC"]) server_sec = Protocol::ZMTP::Z85.decode(ENV["OMQ_SERVER_SECRET"]) else key = crypto::PrivateKey.generate server_pub = key.public_key.to_s server_sec = key.to_s end sock.mechanism = Protocol::ZMTP::Mechanism::Curve.server( public_key: server_pub, secret_key: server_sec, crypto: crypto ) $stderr.puts "OMQ_SERVER_KEY='#{Protocol::ZMTP::Z85.encode(server_pub)}'" end end |
.setup_subscriptions(sock, config) ⇒ Object
Subscribe or join groups on sock according to config.
134 135 136 137 138 139 140 141 142 |
# File 'lib/omq/cli/socket_setup.rb', line 134 def self.setup_subscriptions(sock, config) case config.type_name when "sub" prefixes = config.subscribes.empty? ? [""] : config.subscribes prefixes.each { |p| sock.subscribe(p) } when "dish" config.joins.each { |g| sock.join(g) } end end |