Module: OMQ::CLI::SocketSetup

Defined in:
lib/omq/cli/socket_setup.rb

Overview

Stateless helper for socket construction and configuration. All methods are module-level so callers compose rather than inherit.

Constant Summary collapse

DEFAULT_HWM =

Default high water mark applied when the user does not pass –hwm. Lower than libzmq’s default (1000) to keep memory footprint bounded for the typical CLI use cases (interactive debugging, short-lived pipelines). 64 matches the recv pump’s per-fairness-batch limit (one batch exactly fills a full queue).

64
DEFAULT_RECV_MAXSZ =

Default max inbound message size applied when the user does not pass –recv-maxsz. The omq library itself is unlimited by default; the CLI caps inbound messages at 1 MiB so that a misconfigured or malicious peer cannot force arbitrary memory allocation on a terminal user. Users can raise it with –recv-maxsz N, or disable it entirely with –recv-maxsz 0.

1 << 20
PURE_SEND_TYPES =

Socket types that never receive application data. These opt out of the default passive-compression behavior – they would otherwise advertise a profile and wrap every outgoing frame in a 4-byte uncompressed sentinel for no benefit, since there’s nothing to decompress in return.

%w[push pub scatter radio].freeze

Class Method Summary collapse

Class Method Details

.apply_compression(sock, config, type_name) ⇒ Object

Installs ZMTP-Zstd compression on sock based on type_name and the explicit flags in config. Three outcomes:

  • config.compress is true –> active compression (auto-dict). Outgoing frames are compressed; incoming are decoded.

  • config.compress is false and type_name can receive –> passive compression (RFC Sec. 6.4). The socket advertises the profile so an active peer can compress on the wire and we can decode it, but we never compress our own outgoing frames.

  • config.compress is false and type_name is in PURE_SEND_TYPES –> no compression. Pure senders have no incoming traffic to decompress, so passive mode is pure overhead on outgoing.

Callers pass type_name explicitly (rather than reading it off config) so the pipe runners can install different modes on their push/pull ends of a single pipe.



50
51
52
53
54
55
56
# File 'lib/omq/cli/socket_setup.rb', line 50

def self.apply_compression(sock, config, type_name)
  if config.compress
    sock.compression = OMQ::Compression::Zstd.auto(level: config.compress_level || -3)
  elsif !PURE_SEND_TYPES.include?(type_name)
    sock.compression = OMQ::Compression::Zstd.auto(passive: true)
  end
end

.apply_options(sock, config) ⇒ Object

Apply common socket options from config to sock.



61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/omq/cli/socket_setup.rb', line 61

def self.apply_options(sock, config)
  sock.linger             = config.linger
  sock.recv_timeout       = config.timeout       if config.timeout
  sock.send_timeout       = config.timeout       if config.timeout
  sock.reconnect_interval = config.reconnect_ivl if config.reconnect_ivl
  sock.heartbeat_interval = config.heartbeat_ivl if config.heartbeat_ivl
  # nil → default; 0 stays 0 (unbounded), any other integer is taken as-is.
  sock.send_hwm           = config.send_hwm || DEFAULT_HWM
  sock.recv_hwm           = config.recv_hwm || DEFAULT_HWM
  sock.sndbuf             = config.sndbuf        if config.sndbuf
  sock.rcvbuf             = config.rcvbuf        if config.rcvbuf
end

.apply_recv_maxsz(sock, config) ⇒ Object

–recv-maxsz: nil → 1 MiB default; 0 → explicitly unlimited; else → as-is.



90
91
92
93
94
95
96
97
# File 'lib/omq/cli/socket_setup.rb', line 90

def self.apply_recv_maxsz(sock, config)
  sock.max_message_size =
    case config.recv_maxsz
    when nil then DEFAULT_RECV_MAXSZ
    when 0   then nil
    else          config.recv_maxsz
    end
end

.attach(sock, config, verbose: 0, timestamps: nil) ⇒ Object

Bind/connect sock using URL strings from config.binds / config.connects. verbose gates logging (>= 1), timestamps controls prefix.



103
104
105
106
107
108
109
110
111
112
# File 'lib/omq/cli/socket_setup.rb', line 103

def self.attach(sock, config, verbose: 0, timestamps: nil)
  config.binds.each do |url|
    sock.bind(url)
    CLI::Term.write_attach(:bind, sock.last_endpoint, timestamps) if verbose >= 1
  end
  config.connects.each do |url|
    sock.connect(url)
    CLI::Term.write_attach(:connect, url, timestamps) if verbose >= 1
  end
end

.attach_endpoints(sock, endpoints, verbose: 0, timestamps: nil) ⇒ Object

Bind/connect sock from an Array of Endpoint objects. Used by PipeRunner, which works with structured endpoint lists. verbose gates logging (>= 1), timestamps controls prefix.



119
120
121
122
123
124
125
126
127
128
129
# File 'lib/omq/cli/socket_setup.rb', line 119

def self.attach_endpoints(sock, endpoints, verbose: 0, timestamps: nil)
  endpoints.each do |ep|
    if ep.bind?
      sock.bind(ep.url)
      CLI::Term.write_attach(:bind, sock.last_endpoint, timestamps) if verbose >= 1
    else
      sock.connect(ep.url)
      CLI::Term.write_attach(:connect, ep.url, timestamps) if verbose >= 1
    end
  end
end

.build(klass, config) ⇒ Object

Create and fully configure a socket from klass and config.



77
78
79
80
81
82
83
84
85
86
# File 'lib/omq/cli/socket_setup.rb', line 77

def self.build(klass, config)
  sock = config.ffi ? klass.new(backend: :ffi) : klass.new
  sock.conflate = true if config.conflate && %w[pub radio].include?(config.type_name)
  apply_options(sock, config)
  apply_recv_maxsz(sock, config)
  sock.identity         = config.identity   if config.identity
  sock.router_mandatory = true if config.type_name == "router"
  apply_compression(sock, config, config.type_name)
  sock
end

.kill_on_protocol_error(sock, event) ⇒ Object

CLI-level policy: a peer that commits a protocol-level violation (Protocol::ZMTP::Error — oversized frame, decompression bytebomb, bad framing, …) is almost certainly a misconfiguration the user needs to see. Mark sock dead so the next receive raises SocketDeadError. The library itself just drops the connection and keeps serving the others; this stricter policy is CLI-only.

Parameters:

  • sock (OMQ::Socket)
  • event (OMQ::MonitorEvent)


194
195
196
197
198
199
# File 'lib/omq/cli/socket_setup.rb', line 194

def self.kill_on_protocol_error(sock, event)
  return unless event.type == :disconnected
  error = event.detail && event.detail[:error]
  return unless error.is_a?(Protocol::ZMTP::Error)
  sock.engine.signal_fatal_error(error)
end

.setup_curve(sock, config) ⇒ Object

Configure CURVE encryption on sock using config and env vars.



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/omq/cli/socket_setup.rb', line 147

def self.setup_curve(sock, config)
  server_key_z85 = config.curve_server_key || ENV["OMQ_SERVER_KEY"]
  server_mode    = config.curve_server || (ENV["OMQ_SERVER_PUBLIC"] && ENV["OMQ_SERVER_SECRET"])

  return unless server_key_z85 || server_mode

  crypto = CLI.load_curve_crypto(config.crypto || ENV["OMQ_CRYPTO"], verbose: config.verbose >= 1)
  require "protocol/zmtp/mechanism/curve"

  if server_key_z85
    server_key = Protocol::ZMTP::Z85.decode(server_key_z85)
    client_key = crypto::PrivateKey.generate
    sock.mechanism = Protocol::ZMTP::Mechanism::Curve.client(
      public_key: client_key.public_key.to_s,
      secret_key: client_key.to_s,
      server_key: server_key,
      crypto: crypto
    )
  elsif server_mode
    if ENV["OMQ_SERVER_PUBLIC"] && ENV["OMQ_SERVER_SECRET"]
      server_pub = Protocol::ZMTP::Z85.decode(ENV["OMQ_SERVER_PUBLIC"])
      server_sec = Protocol::ZMTP::Z85.decode(ENV["OMQ_SERVER_SECRET"])
    else
      key        = crypto::PrivateKey.generate
      server_pub = key.public_key.to_s
      server_sec = key.to_s
    end
    sock.mechanism = Protocol::ZMTP::Mechanism::Curve.server(
      public_key: server_pub,
      secret_key: server_sec,
      crypto: crypto
    )
    $stderr.puts "OMQ_SERVER_KEY='#{Protocol::ZMTP::Z85.encode(server_pub)}'"
  end
end

.setup_subscriptions(sock, config) ⇒ Object

Subscribe or join groups on sock according to config.



134
135
136
137
138
139
140
141
142
# File 'lib/omq/cli/socket_setup.rb', line 134

def self.setup_subscriptions(sock, config)
  case config.type_name
  when "sub"
    prefixes = config.subscribes.empty? ? [""] : config.subscribes
    prefixes.each { |p| sock.subscribe(p) }
  when "dish"
    config.joins.each { |g| sock.join(g) }
  end
end