Module: OMQ::CLI::SocketSetup
- Defined in:
- lib/omq/cli/socket_setup.rb
Overview
Stateless helper for socket construction and configuration. All methods are module-level so callers compose rather than inherit.
Constant Summary collapse
- DEFAULT_HWM =
Default high water mark applied when the user does not pass –hwm. Lower than libzmq’s default (1000) to keep memory footprint bounded for the typical CLI use cases (interactive debugging, short-lived pipelines). 64 matches the recv pump’s per-fairness-batch limit (one batch exactly fills a full queue).
64- DEFAULT_RECV_MAXSZ =
Default max inbound message size applied when the user does not pass –recv-maxsz. The omq library itself is unlimited by default; the CLI caps inbound messages at 1 MiB so that a misconfigured or malicious peer cannot force arbitrary memory allocation on a terminal user. Users can raise it with –recv-maxsz N, or disable it entirely with –recv-maxsz 0.
1 << 20
Class Method Summary collapse
-
.apply_options(sock, config) ⇒ Object
Apply common socket options from
configtosock. -
.apply_recv_maxsz(sock, config) ⇒ Object
–recv-maxsz: nil → 1 MiB default; 0 → explicitly unlimited; else → as-is.
-
.attach(sock, config, verbose: 0, timestamps: nil) ⇒ Object
Bind/connect
sockusing URL strings fromconfig.binds/config.connects. -
.attach_endpoints(sock, endpoints, config: nil, verbose: 0, timestamps: nil) ⇒ Object
Bind/connect
sockfrom an Array of Endpoint objects. -
.build(klass, config) ⇒ Object
Create and fully configure a socket from
klassandconfig. -
.compress_opts(config) ⇒ Object
Returns bind/connect kwargs for compression (level:) when enabled.
-
.compress_url(url, config) ⇒ Object
Upgrades a
tcp://URL to zstdtcp://+ when compression is enabled. -
.kill_on_protocol_error(sock, event) ⇒ Object
CLI-level policy: a peer that commits a protocol-level violation (Protocol::ZMTP::Error — oversized frame, decompression bytebomb, bad framing, …) is almost certainly a misconfiguration the user needs to see.
-
.setup_curve(sock, config) ⇒ Object
Configure CURVE encryption on
sockusingconfigand env vars. -
.setup_subscriptions(sock, config) ⇒ Object
Subscribe or join groups on
sockaccording toconfig.
Class Method Details
.apply_options(sock, config) ⇒ Object
Apply common socket options from config to sock.
47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/omq/cli/socket_setup.rb', line 47 def self.(sock, config) sock.linger = config.linger sock.recv_timeout = config.timeout if config.timeout sock.send_timeout = config.timeout if config.timeout sock.reconnect_interval = config.reconnect_ivl if config.reconnect_ivl sock.heartbeat_interval = config.heartbeat_ivl if config.heartbeat_ivl # nil → default; 0 stays 0 (unbounded), any other integer is taken as-is. sock.send_hwm = config.send_hwm || DEFAULT_HWM sock.recv_hwm = config.recv_hwm || DEFAULT_HWM sock.sndbuf = config.sndbuf if config.sndbuf sock.rcvbuf = config.rcvbuf if config.rcvbuf end |
.apply_recv_maxsz(sock, config) ⇒ Object
–recv-maxsz: nil → 1 MiB default; 0 → explicitly unlimited; else → as-is.
75 76 77 78 79 80 81 82 |
# File 'lib/omq/cli/socket_setup.rb', line 75 def self.apply_recv_maxsz(sock, config) sock. = case config.recv_maxsz when nil then DEFAULT_RECV_MAXSZ when 0 then nil else config.recv_maxsz end end |
.attach(sock, config, verbose: 0, timestamps: nil) ⇒ Object
Bind/connect sock using URL strings from config.binds / config.connects. verbose gates logging (>= 1), timestamps controls prefix.
88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/omq/cli/socket_setup.rb', line 88 def self.attach(sock, config, verbose: 0, timestamps: nil) opts = compress_opts(config) config.binds.each do |url| uri = sock.bind(compress_url(url, config), **opts) CLI::Term.write_attach(:bind, uri.to_s, ) if verbose >= 1 end config.connects.each do |url| sock.connect(compress_url(url, config), **opts) CLI::Term.write_attach(:connect, url, ) if verbose >= 1 end end |
.attach_endpoints(sock, endpoints, config: nil, verbose: 0, timestamps: nil) ⇒ Object
Bind/connect sock from an Array of Endpoint objects. Used by PipeRunner, which works with structured endpoint lists. verbose gates logging (>= 1), timestamps controls prefix.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/omq/cli/socket_setup.rb', line 107 def self.attach_endpoints(sock, endpoints, config: nil, verbose: 0, timestamps: nil) opts = config ? compress_opts(config) : {} endpoints.each do |ep| url = config ? compress_url(ep.url, config) : ep.url if ep.bind? uri = sock.bind(url, **opts) CLI::Term.write_attach(:bind, uri.to_s, ) if verbose >= 1 else sock.connect(url, **opts) CLI::Term.write_attach(:connect, ep.url, ) if verbose >= 1 end end end |
.build(klass, config) ⇒ Object
Create and fully configure a socket from klass and config.
63 64 65 66 67 68 69 70 71 |
# File 'lib/omq/cli/socket_setup.rb', line 63 def self.build(klass, config) sock = config.ffi ? klass.new(backend: :ffi) : klass.new sock.conflate = true if config.conflate && %w[pub radio].include?(config.type_name) (sock, config) apply_recv_maxsz(sock, config) sock.identity = config.identity if config.identity sock.router_mandatory = true if config.type_name == "router" sock end |
.compress_opts(config) ⇒ Object
Returns bind/connect kwargs for compression (level:) when enabled.
38 39 40 41 42 |
# File 'lib/omq/cli/socket_setup.rb', line 38 def self.compress_opts(config) return {} unless config.compress { level: config.compress_level || -3 } end |
.compress_url(url, config) ⇒ Object
Upgrades a tcp:// URL to zstdtcp://+ when compression is enabled. Returns the URL unchanged for non-TCP or when compression is off.
28 29 30 31 32 33 |
# File 'lib/omq/cli/socket_setup.rb', line 28 def self.compress_url(url, config) return url unless config.compress return url unless url.start_with?("tcp://") url.sub("tcp://", "zstd+tcp://") end |
.kill_on_protocol_error(sock, event) ⇒ Object
CLI-level policy: a peer that commits a protocol-level violation (Protocol::ZMTP::Error — oversized frame, decompression bytebomb, bad framing, …) is almost certainly a misconfiguration the user needs to see. Mark sock dead so the next receive raises SocketDeadError. The library itself just drops the connection and keeps serving the others; this stricter policy is CLI-only.
186 187 188 189 190 191 |
# File 'lib/omq/cli/socket_setup.rb', line 186 def self.kill_on_protocol_error(sock, event) return unless event.type == :disconnected error = event.detail && event.detail[:error] return unless error.is_a?(Protocol::ZMTP::Error) sock.engine.signal_fatal_error(error) end |
.setup_curve(sock, config) ⇒ Object
Configure CURVE encryption on sock using config and env vars.
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/omq/cli/socket_setup.rb', line 139 def self.setup_curve(sock, config) server_key_z85 = config.curve_server_key || ENV["OMQ_SERVER_KEY"] server_mode = config.curve_server || (ENV["OMQ_SERVER_PUBLIC"] && ENV["OMQ_SERVER_SECRET"]) return unless server_key_z85 || server_mode crypto = CLI.load_curve_crypto(config.crypto || ENV["OMQ_CRYPTO"], verbose: config.verbose >= 1) require "protocol/zmtp/mechanism/curve" if server_key_z85 server_key = Protocol::ZMTP::Z85.decode(server_key_z85) client_key = crypto::PrivateKey.generate sock.mechanism = Protocol::ZMTP::Mechanism::Curve.client( public_key: client_key.public_key.to_s, secret_key: client_key.to_s, server_key: server_key, crypto: crypto ) elsif server_mode if ENV["OMQ_SERVER_PUBLIC"] && ENV["OMQ_SERVER_SECRET"] server_pub = Protocol::ZMTP::Z85.decode(ENV["OMQ_SERVER_PUBLIC"]) server_sec = Protocol::ZMTP::Z85.decode(ENV["OMQ_SERVER_SECRET"]) else key = crypto::PrivateKey.generate server_pub = key.public_key.to_s server_sec = key.to_s end sock.mechanism = Protocol::ZMTP::Mechanism::Curve.server( public_key: server_pub, secret_key: server_sec, crypto: crypto ) $stderr.puts "OMQ_SERVER_KEY='#{Protocol::ZMTP::Z85.encode(server_pub)}'" end end |
.setup_subscriptions(sock, config) ⇒ Object
Subscribe or join groups on sock according to config.
126 127 128 129 130 131 132 133 134 |
# File 'lib/omq/cli/socket_setup.rb', line 126 def self.setup_subscriptions(sock, config) case config.type_name when "sub" prefixes = config.subscribes.empty? ? [""] : config.subscribes prefixes.each { |p| sock.subscribe(p) } when "dish" config.joins.each { |g| sock.join(g) } end end |