Module: NNQ::CLI::SocketSetup

Defined in:
lib/nnq/cli/socket_setup.rb

Overview

Stateless helper for socket construction and configuration. All methods are module-level so callers compose rather than inherit.

Constant Summary collapse

DEFAULT_HWM =

Default high water mark applied when the user does not pass –hwm. 64 matches the send pump’s per-fairness-batch limit (one batch exactly fills a full queue).

64
DEFAULT_RECV_MAXSZ =

Default max inbound message size (1 MiB) so a misconfigured or malicious peer can’t force arbitrary memory allocation on a terminal user. Users can raise it with –recv-maxsz N, or disable it entirely with –recv-maxsz 0.

1 << 20

Class Method Summary collapse

Class Method Details

.apply_options(sock, config) ⇒ Object

Apply post-construction socket options from config to sock. send_hwm and linger are construction-time kwargs (see build); the rest of the options are set here and read later by the engine/transports.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/nnq/cli/socket_setup.rb', line 25

def self.apply_options(sock, config)
  sock.options.read_timeout       = config.timeout       if config.timeout
  sock.options.write_timeout      = config.timeout       if config.timeout
  sock.options.reconnect_interval = config.reconnect_ivl if config.reconnect_ivl

  case config.recv_maxsz
  when nil
    sock.options.max_message_size = DEFAULT_RECV_MAXSZ
  when 0  
    sock.options.max_message_size = nil
  else
    config.recv_maxsz
  end
end

.attach(sock, config, verbose: 0, timestamps: nil) ⇒ Object

Bind/connect sock using URL strings from config.binds / config.connects. verbose is the integer verbosity level (0 = silent).



57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/nnq/cli/socket_setup.rb', line 57

def self.attach(sock, config, verbose: 0, timestamps: nil)
  opts = compress_opts(config.compress)

  config.binds.each do |url|
    sock.bind(compress_url(url, config.compress), **opts)
    CLI::Term.write_attach(:bind, sock.last_endpoint, timestamps) if verbose >= 1
  end

  config.connects.each do |url|
    sock.connect(compress_url(url, config.compress), **opts)
    CLI::Term.write_attach(:connect, url, timestamps) if verbose >= 1
  end
end

.attach_endpoints(sock, endpoints, compress_level: nil, verbose: 0, timestamps: nil) ⇒ Object

Bind/connect sock from an Array of Endpoint objects. Used by PipeRunner, which works with structured endpoint lists. compress_level rewrites tcp:// endpoints to zstdtcp://+ when set.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/nnq/cli/socket_setup.rb', line 75

def self.attach_endpoints(sock, endpoints, compress_level: nil, verbose: 0, timestamps: nil)
  opts = compress_opts(compress_level)

  endpoints.each do |ep|
    url = compress_url(ep.url, compress_level)

    if ep.bind?
      sock.bind(url, **opts)
      CLI::Term.write_attach(:bind, sock.last_endpoint, timestamps) if verbose >= 1
    else
      sock.connect(url, **opts)
      CLI::Term.write_attach(:connect, ep.url, timestamps) if verbose >= 1
    end
  end
end

.build(klass, config) ⇒ Object

Create and fully configure a socket from klass and config. nnq’s Socket constructor takes linger + send_hwm directly (send_hwm is captured during routing init and can’t be changed after the fact), so we pass them there.



45
46
47
48
49
50
51
52
# File 'lib/nnq/cli/socket_setup.rb', line 45

def self.build(klass, config)
  linger   = config.linger
  send_hwm = config.send_hwm || DEFAULT_HWM
  sock     = klass.new(linger:, send_hwm:)

  apply_options(sock, config)
  sock
end

.compress_opts(level) ⇒ Object

Returns bind/connect kwargs for compression (level:) when enabled.



103
104
105
# File 'lib/nnq/cli/socket_setup.rb', line 103

def self.compress_opts(level)
  level ? { level: level } : {}
end

.compress_url(url, level) ⇒ Object

Upgrades a tcp:// URL to zstdtcp://+ when a compression level is set. Returns the URL unchanged for non-TCP schemes or when compression is off. User-supplied zstdtcp://+ URLs pass through.



95
96
97
98
99
# File 'lib/nnq/cli/socket_setup.rb', line 95

def self.compress_url(url, level)
  return url unless level
  return url unless url.start_with?("tcp://")
  url.sub("tcp://", "zstd+tcp://")
end

.setup_subscriptions(sock, config) ⇒ Object

Subscribe to prefixes on a SUB socket.

Unlike ZeroMQ, nng’s sub0 starts with an empty subscription set, meaning no messages match. If the user passed no ‘–subscribe` flags, default to subscribing to the empty prefix so the CLI feels like `nngcat` / `omq sub`: receive everything by default.



114
115
116
117
118
# File 'lib/nnq/cli/socket_setup.rb', line 114

def self.setup_subscriptions(sock, config)
  return unless config.type_name == "sub"
  prefixes = config.subscribes.empty? ? [""] : config.subscribes
  prefixes.each { |p| sock.subscribe(p) }
end