Module: OMQ::CLI::SocketSetup
- Defined in:
- lib/omq/cli/socket_setup.rb
Overview
Stateless helper for socket construction and configuration. All methods are module-level so callers compose rather than inherit.
Constant Summary collapse
- DEFAULT_HWM =
Default high water mark applied when the user does not pass –hwm. Lower than libzmq’s default (1000) to keep memory footprint bounded for the typical CLI use cases (interactive debugging, short-lived pipelines). 64 matches the recv pump’s per-fairness-batch limit (one batch exactly fills a full queue).
64- DEFAULT_RECV_MAXSZ =
Default max inbound message size applied when the user does not pass –recv-maxsz. The omq library itself is unlimited by default; the CLI caps inbound messages at 1 MiB so that a misconfigured or malicious peer cannot force arbitrary memory allocation on a terminal user. Users can raise it with –recv-maxsz N, or disable it entirely with –recv-maxsz 0.
1 << 20
- DEFAULT_ZSTD_LEVEL =
Default zstd level when the user picks zstd without specifying one (e.g. ‘–compress=zstd`). Matches the `-z` shortcut.
-3
Class Method Summary collapse
-
.apply_options(sock, config) ⇒ Object
Apply common socket options from
configtosock. -
.apply_recv_maxsz(sock, config) ⇒ Object
–recv-maxsz: nil → 1 MiB default; 0 → explicitly unlimited; else → as-is.
-
.attach(sock, config, verbose: 0, timestamps: nil) ⇒ Object
Bind/connect
sockusing URL strings fromconfig.binds/config.connects. -
.attach_endpoints(sock, endpoints, config: nil, verbose: 0, timestamps: nil, side: nil) ⇒ Object
Bind/connect
sockfrom an Array of Endpoint objects. -
.build(klass, config) ⇒ Object
Create and fully configure a socket from
klassandconfig. -
.compress_flag(config, side: nil) ⇒ Object
Human-readable CLI flag for a resolved compress state, suitable for process titles and proctitle-like log lines.
-
.compress_opts(config, side: nil) ⇒ Object
Returns bind/connect kwargs for the chosen codec on the given
side. -
.compress_url(url, config, side: nil) ⇒ Object
Upgrades a
tcp://URL to the codec-specific variant (+zstd+tcp://+ or lz4tcp://+) when compression is enabled for the givenside. -
.kill_on_protocol_error(sock, event) ⇒ Object
CLI-level policy: a peer that commits a protocol-level violation (Protocol::ZMTP::Error — oversized frame, decompression bytebomb, bad framing, …) is almost certainly a misconfiguration the user needs to see.
-
.resolve_compress(config, side) ⇒ Object
Resolves compression state for a given pipe
side(:in, :out, or nil). -
.setup_curve(sock, config) ⇒ Object
Configure CURVE encryption on
sockusingconfigand env vars. -
.setup_subscriptions(sock, config) ⇒ Object
Subscribe or join groups on
sockaccording toconfig.
Class Method Details
.apply_options(sock, config) ⇒ Object
Apply common socket options from config to sock.
91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/omq/cli/socket_setup.rb', line 91 def self.(sock, config) sock.linger = config.linger sock.recv_timeout = config.timeout if config.timeout sock.send_timeout = config.timeout if config.timeout sock.reconnect_interval = config.reconnect_ivl if config.reconnect_ivl sock.heartbeat_interval = config.heartbeat_ivl if config.heartbeat_ivl # nil → default; 0 stays 0 (unbounded), any other integer is taken as-is. sock.send_hwm = config.send_hwm || DEFAULT_HWM sock.recv_hwm = config.recv_hwm || DEFAULT_HWM sock.sndbuf = config.sndbuf if config.sndbuf sock.rcvbuf = config.rcvbuf if config.rcvbuf end |
.apply_recv_maxsz(sock, config) ⇒ Object
–recv-maxsz: nil → 1 MiB default; 0 → explicitly unlimited; else → as-is.
119 120 121 122 123 124 125 126 |
# File 'lib/omq/cli/socket_setup.rb', line 119 def self.apply_recv_maxsz(sock, config) sock. = case config.recv_maxsz when nil then DEFAULT_RECV_MAXSZ when 0 then nil else config.recv_maxsz end end |
.attach(sock, config, verbose: 0, timestamps: nil) ⇒ Object
Bind/connect sock using URL strings from config.binds / config.connects. verbose gates logging (>= 1), timestamps controls prefix.
132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/omq/cli/socket_setup.rb', line 132 def self.attach(sock, config, verbose: 0, timestamps: nil) opts = compress_opts(config) config.binds.each do |url| uri = sock.bind(compress_url(url, config), **opts) CLI::Term.write_attach(:bind, uri.to_s, ) if verbose >= 1 end config.connects.each do |url| sock.connect(compress_url(url, config), **opts) CLI::Term.write_attach(:connect, url, ) if verbose >= 1 end end |
.attach_endpoints(sock, endpoints, config: nil, verbose: 0, timestamps: nil, side: nil) ⇒ Object
Bind/connect sock from an Array of Endpoint objects. Used by PipeRunner, which works with structured endpoint lists. verbose gates logging (>= 1), timestamps controls prefix. side selects per-socket compression (:in for PULL, :out for PUSH in a pipe); nil falls back to global compress settings.
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/omq/cli/socket_setup.rb', line 153 def self.attach_endpoints(sock, endpoints, config: nil, verbose: 0, timestamps: nil, side: nil) opts = config ? compress_opts(config, side: side) : {} endpoints.each do |ep| url = config ? compress_url(ep.url, config, side: side) : ep.url if ep.bind? uri = sock.bind(url, **opts) CLI::Term.write_attach(:bind, uri.to_s, ) if verbose >= 1 else sock.connect(url, **opts) CLI::Term.write_attach(:connect, ep.url, ) if verbose >= 1 end end end |
.build(klass, config) ⇒ Object
Create and fully configure a socket from klass and config.
107 108 109 110 111 112 113 114 115 |
# File 'lib/omq/cli/socket_setup.rb', line 107 def self.build(klass, config) sock = config.ffi ? klass.new(backend: :ffi) : klass.new sock.conflate = true if config.conflate && %w[pub radio].include?(config.type_name) (sock, config) apply_recv_maxsz(sock, config) sock.identity = config.identity if config.identity sock.router_mandatory = true if config.type_name == "router" sock end |
.compress_flag(config, side: nil) ⇒ Object
Human-readable CLI flag for a resolved compress state, suitable for process titles and proctitle-like log lines.
80 81 82 83 84 85 86 |
# File 'lib/omq/cli/socket_setup.rb', line 80 def self.compress_flag(config, side: nil) codec, level = resolve_compress(config, side) case codec when :zstd then level == 3 ? "-Z" : "-z" when :lz4 then "--lz4" end end |
.compress_opts(config, side: nil) ⇒ Object
Returns bind/connect kwargs for the chosen codec on the given side. Zstd accepts a level:; LZ4 takes no tuning knobs.
68 69 70 71 72 73 74 |
# File 'lib/omq/cli/socket_setup.rb', line 68 def self.compress_opts(config, side: nil) codec, level = resolve_compress(config, side) case codec when :zstd then { level: level || DEFAULT_ZSTD_LEVEL } else {} end end |
.compress_url(url, config, side: nil) ⇒ Object
Upgrades a tcp:// URL to the codec-specific variant (+zstd+tcp://+ or lz4tcp://+) when compression is enabled for the given side. Returns the URL unchanged for non-TCP or when compression is off.
56 57 58 59 60 61 62 |
# File 'lib/omq/cli/socket_setup.rb', line 56 def self.compress_url(url, config, side: nil) codec, _ = resolve_compress(config, side) return url if codec.nil? return url unless url.start_with?("tcp://") url.sub("tcp://", "#{codec}+tcp://") end |
.kill_on_protocol_error(sock, event) ⇒ Object
CLI-level policy: a peer that commits a protocol-level violation (Protocol::ZMTP::Error — oversized frame, decompression bytebomb, bad framing, …) is almost certainly a misconfiguration the user needs to see. Mark sock dead so the next receive raises SocketDeadError. The library itself just drops the connection and keeps serving the others; this stricter policy is CLI-only.
232 233 234 235 236 237 |
# File 'lib/omq/cli/socket_setup.rb', line 232 def self.kill_on_protocol_error(sock, event) return unless event.type == :disconnected error = event.detail && event.detail[:error] return unless error.is_a?(Protocol::ZMTP::Error) sock.engine.signal_fatal_error(error) end |
.resolve_compress(config, side) ⇒ Object
Resolves compression state for a given pipe side (:in, :out, or nil). Per-side settings (in_compress / out_compress) override the global config.compress. Returns [codec, level] where codec is one of nil (off), :zstd, or :lz4.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/omq/cli/socket_setup.rb', line 34 def self.resolve_compress(config, side) scoped = case side when :in then !config.in_compress.nil? when :out then !config.out_compress.nil? end if scoped && side == :in [config.in_compress, config.in_compress_level] elsif scoped && side == :out [config.out_compress, config.out_compress_level] else [config.compress, config.compress_level] end end |
.setup_curve(sock, config) ⇒ Object
Configure CURVE encryption on sock using config and env vars.
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/omq/cli/socket_setup.rb', line 185 def self.setup_curve(sock, config) server_key_z85 = config.curve_server_key || ENV["OMQ_SERVER_KEY"] server_mode = config.curve_server || (ENV["OMQ_SERVER_PUBLIC"] && ENV["OMQ_SERVER_SECRET"]) return unless server_key_z85 || server_mode crypto = CLI.load_curve_crypto(config.crypto || ENV["OMQ_CRYPTO"], verbose: config.verbose >= 1) require "protocol/zmtp/mechanism/curve" if server_key_z85 server_key = Protocol::ZMTP::Z85.decode(server_key_z85) client_key = crypto::PrivateKey.generate sock.mechanism = Protocol::ZMTP::Mechanism::Curve.client( public_key: client_key.public_key.to_s, secret_key: client_key.to_s, server_key: server_key, crypto: crypto ) elsif server_mode if ENV["OMQ_SERVER_PUBLIC"] && ENV["OMQ_SERVER_SECRET"] server_pub = Protocol::ZMTP::Z85.decode(ENV["OMQ_SERVER_PUBLIC"]) server_sec = Protocol::ZMTP::Z85.decode(ENV["OMQ_SERVER_SECRET"]) else key = crypto::PrivateKey.generate server_pub = key.public_key.to_s server_sec = key.to_s end sock.mechanism = Protocol::ZMTP::Mechanism::Curve.server( public_key: server_pub, secret_key: server_sec, crypto: crypto ) $stderr.puts "OMQ_SERVER_KEY='#{Protocol::ZMTP::Z85.encode(server_pub)}'" end end |
.setup_subscriptions(sock, config) ⇒ Object
Subscribe or join groups on sock according to config.
172 173 174 175 176 177 178 179 180 |
# File 'lib/omq/cli/socket_setup.rb', line 172 def self.setup_subscriptions(sock, config) case config.type_name when "sub" prefixes = config.subscribes.empty? ? [""] : config.subscribes prefixes.each { |p| sock.subscribe(p) } when "dish" config.joins.each { |g| sock.join(g) } end end |