Module: NNQ::CLI::SocketSetup
- Defined in:
- lib/nnq/cli/socket_setup.rb
Overview
Stateless helper for socket construction and configuration. All methods are module-level so callers compose rather than inherit.
Constant Summary collapse
- DEFAULT_HWM =
Default high water mark applied when the user does not pass –hwm. 64 matches the send pump’s per-fairness-batch limit (one batch exactly fills a full queue).
64- DEFAULT_RECV_MAXSZ =
Default max inbound message size (1 MiB) so a misconfigured or malicious peer can’t force arbitrary memory allocation on a terminal user. Users can raise it with –recv-maxsz N, or disable it entirely with –recv-maxsz 0.
1 << 20
Class Method Summary collapse
-
.apply_options(sock, config) ⇒ Object
Apply post-construction socket options from
configtosock. -
.attach(sock, config, verbose: 0, timestamps: nil) ⇒ Object
Bind/connect
sockusing URL strings fromconfig.binds/config.connects. -
.attach_endpoints(sock, endpoints, compress_level: nil, verbose: 0, timestamps: nil) ⇒ Object
Bind/connect
sockfrom an Array of Endpoint objects. -
.build(klass, config) ⇒ Object
Create and fully configure a socket from
klassandconfig. -
.compress_opts(level) ⇒ Object
Returns bind/connect kwargs for compression (level:) when enabled.
-
.compress_url(url, level) ⇒ Object
Upgrades a
tcp://URL to zstdtcp://+ when a compression level is set. -
.setup_subscriptions(sock, config) ⇒ Object
Subscribe to prefixes on a SUB socket.
Class Method Details
.apply_options(sock, config) ⇒ Object
Apply post-construction socket options from config to sock. send_hwm and linger are construction-time kwargs (see build); the rest of the options are set here and read later by the engine/transports.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/nnq/cli/socket_setup.rb', line 25 def self.(sock, config) sock..read_timeout = config.timeout if config.timeout sock..write_timeout = config.timeout if config.timeout sock..reconnect_interval = config.reconnect_ivl if config.reconnect_ivl case config.recv_maxsz when nil sock.. = DEFAULT_RECV_MAXSZ when 0 sock.. = nil else config.recv_maxsz end end |
.attach(sock, config, verbose: 0, timestamps: nil) ⇒ Object
Bind/connect sock using URL strings from config.binds / config.connects. verbose is the integer verbosity level (0 = silent).
57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/nnq/cli/socket_setup.rb', line 57 def self.attach(sock, config, verbose: 0, timestamps: nil) opts = compress_opts(config.compress) config.binds.each do |url| sock.bind(compress_url(url, config.compress), **opts) CLI::Term.write_attach(:bind, sock.last_endpoint, ) if verbose >= 1 end config.connects.each do |url| sock.connect(compress_url(url, config.compress), **opts) CLI::Term.write_attach(:connect, url, ) if verbose >= 1 end end |
.attach_endpoints(sock, endpoints, compress_level: nil, verbose: 0, timestamps: nil) ⇒ Object
Bind/connect sock from an Array of Endpoint objects. Used by PipeRunner, which works with structured endpoint lists. compress_level rewrites tcp:// endpoints to zstdtcp://+ when set.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/nnq/cli/socket_setup.rb', line 75 def self.attach_endpoints(sock, endpoints, compress_level: nil, verbose: 0, timestamps: nil) opts = compress_opts(compress_level) endpoints.each do |ep| url = compress_url(ep.url, compress_level) if ep.bind? sock.bind(url, **opts) CLI::Term.write_attach(:bind, sock.last_endpoint, ) if verbose >= 1 else sock.connect(url, **opts) CLI::Term.write_attach(:connect, ep.url, ) if verbose >= 1 end end end |
.build(klass, config) ⇒ Object
Create and fully configure a socket from klass and config. nnq’s Socket constructor takes linger + send_hwm directly (send_hwm is captured during routing init and can’t be changed after the fact), so we pass them there.
45 46 47 48 49 50 51 52 |
# File 'lib/nnq/cli/socket_setup.rb', line 45 def self.build(klass, config) linger = config.linger send_hwm = config.send_hwm || DEFAULT_HWM sock = klass.new(linger:, send_hwm:) (sock, config) sock end |
.compress_opts(level) ⇒ Object
Returns bind/connect kwargs for compression (level:) when enabled.
103 104 105 |
# File 'lib/nnq/cli/socket_setup.rb', line 103 def self.compress_opts(level) level ? { level: level } : {} end |
.compress_url(url, level) ⇒ Object
Upgrades a tcp:// URL to zstdtcp://+ when a compression level is set. Returns the URL unchanged for non-TCP schemes or when compression is off. User-supplied zstdtcp://+ URLs pass through.
95 96 97 98 99 |
# File 'lib/nnq/cli/socket_setup.rb', line 95 def self.compress_url(url, level) return url unless level return url unless url.start_with?("tcp://") url.sub("tcp://", "zstd+tcp://") end |
.setup_subscriptions(sock, config) ⇒ Object
Subscribe to prefixes on a SUB socket.
Unlike ZeroMQ, nng’s sub0 starts with an empty subscription set, meaning no messages match. If the user passed no ‘–subscribe` flags, default to subscribing to the empty prefix so the CLI feels like `nngcat` / `omq sub`: receive everything by default.
114 115 116 117 118 |
# File 'lib/nnq/cli/socket_setup.rb', line 114 def self.setup_subscriptions(sock, config) return unless config.type_name == "sub" prefixes = config.subscribes.empty? ? [""] : config.subscribes prefixes.each { |p| sock.subscribe(p) } end |