Module: OMQ::CLI::SocketSetup

Defined in:
lib/omq/cli/socket_setup.rb

Overview

Stateless helper for socket construction and configuration. All methods are module-level so callers compose rather than inherit.

Constant Summary collapse

DEFAULT_HWM =

Default high water mark applied when the user does not pass –hwm. Lower than libzmq’s default (1000) to keep memory footprint bounded for the typical CLI use cases (interactive debugging, short-lived pipelines). 64 matches the recv pump’s per-fairness-batch limit (one batch exactly fills a full queue).

64
DEFAULT_RECV_MAXSZ =

Default max inbound message size applied when the user does not pass –recv-maxsz. The omq library itself is unlimited by default; the CLI caps inbound messages at 1 MiB so that a misconfigured or malicious peer cannot force arbitrary memory allocation on a terminal user. Users can raise it with –recv-maxsz N, or disable it entirely with –recv-maxsz 0.

1 << 20
DEFAULT_ZSTD_LEVEL =

Default zstd level when the user picks zstd without specifying one (e.g. ‘–compress=zstd`). Matches the `-z` shortcut.

-3

Class Method Summary collapse

Class Method Details

.apply_options(sock, config) ⇒ Object

Apply common socket options from config to sock.



91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/omq/cli/socket_setup.rb', line 91

def self.apply_options(sock, config)
  sock.linger             = config.linger
  sock.recv_timeout       = config.timeout       if config.timeout
  sock.send_timeout       = config.timeout       if config.timeout
  sock.reconnect_interval = config.reconnect_ivl if config.reconnect_ivl
  sock.heartbeat_interval = config.heartbeat_ivl if config.heartbeat_ivl
  # nil → default; 0 stays 0 (unbounded), any other integer is taken as-is.
  sock.send_hwm           = config.send_hwm || DEFAULT_HWM
  sock.recv_hwm           = config.recv_hwm || DEFAULT_HWM
  sock.sndbuf             = config.sndbuf        if config.sndbuf
  sock.rcvbuf             = config.rcvbuf        if config.rcvbuf
end

.apply_recv_maxsz(sock, config) ⇒ Object

–recv-maxsz: nil → 1 MiB default; 0 → explicitly unlimited; else → as-is.



119
120
121
122
123
124
125
126
# File 'lib/omq/cli/socket_setup.rb', line 119

def self.apply_recv_maxsz(sock, config)
  sock.max_message_size =
    case config.recv_maxsz
    when nil then DEFAULT_RECV_MAXSZ
    when 0   then nil
    else          config.recv_maxsz
    end
end

.attach(sock, config, verbose: 0, timestamps: nil) ⇒ Object

Bind/connect sock using URL strings from config.binds / config.connects. verbose gates logging (>= 1), timestamps controls prefix.



132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/omq/cli/socket_setup.rb', line 132

def self.attach(sock, config, verbose: 0, timestamps: nil)
  opts = compress_opts(config)

  config.binds.each do |url|
    uri = sock.bind(compress_url(url, config), **opts)
    CLI::Term.write_attach(:bind, uri.to_s, timestamps) if verbose >= 1
  end

  config.connects.each do |url|
    sock.connect(compress_url(url, config), **opts)
    CLI::Term.write_attach(:connect, url, timestamps) if verbose >= 1
  end
end

.attach_endpoints(sock, endpoints, config: nil, verbose: 0, timestamps: nil, side: nil) ⇒ Object

Bind/connect sock from an Array of Endpoint objects. Used by PipeRunner, which works with structured endpoint lists. verbose gates logging (>= 1), timestamps controls prefix. side selects per-socket compression (:in for PULL, :out for PUSH in a pipe); nil falls back to global compress settings.



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/omq/cli/socket_setup.rb', line 153

def self.attach_endpoints(sock, endpoints, config: nil, verbose: 0, timestamps: nil, side: nil)
  opts = config ? compress_opts(config, side: side) : {}

  endpoints.each do |ep|
    url = config ? compress_url(ep.url, config, side: side) : ep.url

    if ep.bind?
      uri = sock.bind(url, **opts)
      CLI::Term.write_attach(:bind, uri.to_s, timestamps) if verbose >= 1
    else
      sock.connect(url, **opts)
      CLI::Term.write_attach(:connect, ep.url, timestamps) if verbose >= 1
    end
  end
end

.build(klass, config) ⇒ Object

Create and fully configure a socket from klass and config.



107
108
109
110
111
112
113
114
115
# File 'lib/omq/cli/socket_setup.rb', line 107

def self.build(klass, config)
  sock = config.ffi ? klass.new(backend: :ffi) : klass.new
  sock.conflate = true if config.conflate && %w[pub radio].include?(config.type_name)
  apply_options(sock, config)
  apply_recv_maxsz(sock, config)
  sock.identity         = config.identity   if config.identity
  sock.router_mandatory = true if config.type_name == "router"
  sock
end

.compress_flag(config, side: nil) ⇒ Object

Human-readable CLI flag for a resolved compress state, suitable for process titles and proctitle-like log lines.



80
81
82
83
84
85
86
# File 'lib/omq/cli/socket_setup.rb', line 80

def self.compress_flag(config, side: nil)
  codec, level = resolve_compress(config, side)
  case codec
  when :zstd then level == 3 ? "-Z" : "-z"
  when :lz4  then "--lz4"
  end
end

.compress_opts(config, side: nil) ⇒ Object

Returns bind/connect kwargs for the chosen codec on the given side. Zstd accepts a level:; LZ4 takes no tuning knobs.



68
69
70
71
72
73
74
# File 'lib/omq/cli/socket_setup.rb', line 68

def self.compress_opts(config, side: nil)
  codec, level = resolve_compress(config, side)
  case codec
  when :zstd then { level: level || DEFAULT_ZSTD_LEVEL }
  else            {}
  end
end

.compress_url(url, config, side: nil) ⇒ Object

Upgrades a tcp:// URL to the codec-specific variant (+zstd+tcp://+ or lz4tcp://+) when compression is enabled for the given side. Returns the URL unchanged for non-TCP or when compression is off.



56
57
58
59
60
61
62
# File 'lib/omq/cli/socket_setup.rb', line 56

def self.compress_url(url, config, side: nil)
  codec, _ = resolve_compress(config, side)
  return url if codec.nil?
  return url unless url.start_with?("tcp://")

  url.sub("tcp://", "#{codec}+tcp://")
end

.kill_on_protocol_error(sock, event) ⇒ Object

CLI-level policy: a peer that commits a protocol-level violation (Protocol::ZMTP::Error — oversized frame, decompression bytebomb, bad framing, …) is almost certainly a misconfiguration the user needs to see. Mark sock dead so the next receive raises SocketDeadError. The library itself just drops the connection and keeps serving the others; this stricter policy is CLI-only.

Parameters:

  • sock (OMQ::Socket)
  • event (OMQ::MonitorEvent)


232
233
234
235
236
237
# File 'lib/omq/cli/socket_setup.rb', line 232

def self.kill_on_protocol_error(sock, event)
  return unless event.type == :disconnected
  error = event.detail && event.detail[:error]
  return unless error.is_a?(Protocol::ZMTP::Error)
  sock.engine.signal_fatal_error(error)
end

.resolve_compress(config, side) ⇒ Object

Resolves compression state for a given pipe side (:in, :out, or nil). Per-side settings (in_compress / out_compress) override the global config.compress. Returns [codec, level] where codec is one of nil (off), :zstd, or :lz4.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/omq/cli/socket_setup.rb', line 34

def self.resolve_compress(config, side)
  scoped =
    case side
    when :in  then !config.in_compress.nil?
    when :out then !config.out_compress.nil?
    end

  if scoped && side == :in
    [config.in_compress,  config.in_compress_level]
  elsif scoped && side == :out
    [config.out_compress, config.out_compress_level]
  else
    [config.compress, config.compress_level]
  end
end

.setup_curve(sock, config) ⇒ Object

Configure CURVE encryption on sock using config and env vars.



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/omq/cli/socket_setup.rb', line 185

def self.setup_curve(sock, config)
  server_key_z85 = config.curve_server_key || ENV["OMQ_SERVER_KEY"]
  server_mode    = config.curve_server || (ENV["OMQ_SERVER_PUBLIC"] && ENV["OMQ_SERVER_SECRET"])

  return unless server_key_z85 || server_mode

  crypto = CLI.load_curve_crypto(config.crypto || ENV["OMQ_CRYPTO"], verbose: config.verbose >= 1)
  require "protocol/zmtp/mechanism/curve"

  if server_key_z85
    server_key = Protocol::ZMTP::Z85.decode(server_key_z85)
    client_key = crypto::PrivateKey.generate
    sock.mechanism = Protocol::ZMTP::Mechanism::Curve.client(
      public_key: client_key.public_key.to_s,
      secret_key: client_key.to_s,
      server_key: server_key,
      crypto: crypto
    )
  elsif server_mode
    if ENV["OMQ_SERVER_PUBLIC"] && ENV["OMQ_SERVER_SECRET"]
      server_pub = Protocol::ZMTP::Z85.decode(ENV["OMQ_SERVER_PUBLIC"])
      server_sec = Protocol::ZMTP::Z85.decode(ENV["OMQ_SERVER_SECRET"])
    else
      key        = crypto::PrivateKey.generate
      server_pub = key.public_key.to_s
      server_sec = key.to_s
    end
    sock.mechanism = Protocol::ZMTP::Mechanism::Curve.server(
      public_key: server_pub,
      secret_key: server_sec,
      crypto: crypto
    )
    $stderr.puts "OMQ_SERVER_KEY='#{Protocol::ZMTP::Z85.encode(server_pub)}'"
  end
end

.setup_subscriptions(sock, config) ⇒ Object

Subscribe or join groups on sock according to config.



172
173
174
175
176
177
178
179
180
# File 'lib/omq/cli/socket_setup.rb', line 172

def self.setup_subscriptions(sock, config)
  case config.type_name
  when "sub"
    prefixes = config.subscribes.empty? ? [""] : config.subscribes
    prefixes.each { |p| sock.subscribe(p) }
  when "dish"
    config.joins.each { |g| sock.join(g) }
  end
end