Module: OMQ::CLI::SocketSetup

Defined in:
lib/omq/cli/socket_setup.rb

Overview

Stateless helper for socket construction and configuration. All methods are module-level so callers compose rather than inherit.

Constant Summary collapse

DEFAULT_HWM =

Default high water mark applied when the user does not pass –hwm. Lower than libzmq’s default (1000) to keep memory footprint bounded for the typical CLI use cases (interactive debugging, short-lived pipelines). 64 matches the recv pump’s per-fairness-batch limit (one batch exactly fills a full queue).

64
DEFAULT_RECV_MAXSZ =

Default max inbound message size applied when the user does not pass –recv-maxsz. The omq library itself is unlimited by default; the CLI caps inbound messages at 1 MiB so that a misconfigured or malicious peer cannot force arbitrary memory allocation on a terminal user. Users can raise it with –recv-maxsz N, or disable it entirely with –recv-maxsz 0.

1 << 20

Class Method Summary collapse

Class Method Details

.apply_options(sock, config) ⇒ Object

Apply common socket options from config to sock.



47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/omq/cli/socket_setup.rb', line 47

def self.apply_options(sock, config)
  sock.linger             = config.linger
  sock.recv_timeout       = config.timeout       if config.timeout
  sock.send_timeout       = config.timeout       if config.timeout
  sock.reconnect_interval = config.reconnect_ivl if config.reconnect_ivl
  sock.heartbeat_interval = config.heartbeat_ivl if config.heartbeat_ivl
  # nil → default; 0 stays 0 (unbounded), any other integer is taken as-is.
  sock.send_hwm           = config.send_hwm || DEFAULT_HWM
  sock.recv_hwm           = config.recv_hwm || DEFAULT_HWM
  sock.sndbuf             = config.sndbuf        if config.sndbuf
  sock.rcvbuf             = config.rcvbuf        if config.rcvbuf
end

.apply_recv_maxsz(sock, config) ⇒ Object

–recv-maxsz: nil → 1 MiB default; 0 → explicitly unlimited; else → as-is.



75
76
77
78
79
80
81
82
# File 'lib/omq/cli/socket_setup.rb', line 75

def self.apply_recv_maxsz(sock, config)
  sock.max_message_size =
    case config.recv_maxsz
    when nil then DEFAULT_RECV_MAXSZ
    when 0   then nil
    else          config.recv_maxsz
    end
end

.attach(sock, config, verbose: 0, timestamps: nil) ⇒ Object

Bind/connect sock using URL strings from config.binds / config.connects. verbose gates logging (>= 1), timestamps controls prefix.



88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/omq/cli/socket_setup.rb', line 88

def self.attach(sock, config, verbose: 0, timestamps: nil)
  opts = compress_opts(config)

  config.binds.each do |url|
    uri = sock.bind(compress_url(url, config), **opts)
    CLI::Term.write_attach(:bind, uri.to_s, timestamps) if verbose >= 1
  end

  config.connects.each do |url|
    sock.connect(compress_url(url, config), **opts)
    CLI::Term.write_attach(:connect, url, timestamps) if verbose >= 1
  end
end

.attach_endpoints(sock, endpoints, config: nil, verbose: 0, timestamps: nil) ⇒ Object

Bind/connect sock from an Array of Endpoint objects. Used by PipeRunner, which works with structured endpoint lists. verbose gates logging (>= 1), timestamps controls prefix.



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/omq/cli/socket_setup.rb', line 107

def self.attach_endpoints(sock, endpoints, config: nil, verbose: 0, timestamps: nil)
  opts = config ? compress_opts(config) : {}

  endpoints.each do |ep|
    url = config ? compress_url(ep.url, config) : ep.url

    if ep.bind?
      uri = sock.bind(url, **opts)
      CLI::Term.write_attach(:bind, uri.to_s, timestamps) if verbose >= 1
    else
      sock.connect(url, **opts)
      CLI::Term.write_attach(:connect, ep.url, timestamps) if verbose >= 1
    end
  end
end

.build(klass, config) ⇒ Object

Create and fully configure a socket from klass and config.



63
64
65
66
67
68
69
70
71
# File 'lib/omq/cli/socket_setup.rb', line 63

def self.build(klass, config)
  sock = config.ffi ? klass.new(backend: :ffi) : klass.new
  sock.conflate = true if config.conflate && %w[pub radio].include?(config.type_name)
  apply_options(sock, config)
  apply_recv_maxsz(sock, config)
  sock.identity         = config.identity   if config.identity
  sock.router_mandatory = true if config.type_name == "router"
  sock
end

.compress_opts(config) ⇒ Object

Returns bind/connect kwargs for compression (level:) when enabled.



38
39
40
41
42
# File 'lib/omq/cli/socket_setup.rb', line 38

def self.compress_opts(config)
  return {} unless config.compress

  { level: config.compress_level || -3 }
end

.compress_url(url, config) ⇒ Object

Upgrades a tcp:// URL to zstdtcp://+ when compression is enabled. Returns the URL unchanged for non-TCP or when compression is off.



28
29
30
31
32
33
# File 'lib/omq/cli/socket_setup.rb', line 28

def self.compress_url(url, config)
  return url unless config.compress
  return url unless url.start_with?("tcp://")

  url.sub("tcp://", "zstd+tcp://")
end

.kill_on_protocol_error(sock, event) ⇒ Object

CLI-level policy: a peer that commits a protocol-level violation (Protocol::ZMTP::Error — oversized frame, decompression bytebomb, bad framing, …) is almost certainly a misconfiguration the user needs to see. Mark sock dead so the next receive raises SocketDeadError. The library itself just drops the connection and keeps serving the others; this stricter policy is CLI-only.

Parameters:

  • sock (OMQ::Socket)
  • event (OMQ::MonitorEvent)


186
187
188
189
190
191
# File 'lib/omq/cli/socket_setup.rb', line 186

def self.kill_on_protocol_error(sock, event)
  return unless event.type == :disconnected
  error = event.detail && event.detail[:error]
  return unless error.is_a?(Protocol::ZMTP::Error)
  sock.engine.signal_fatal_error(error)
end

.setup_curve(sock, config) ⇒ Object

Configure CURVE encryption on sock using config and env vars.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/omq/cli/socket_setup.rb', line 139

def self.setup_curve(sock, config)
  server_key_z85 = config.curve_server_key || ENV["OMQ_SERVER_KEY"]
  server_mode    = config.curve_server || (ENV["OMQ_SERVER_PUBLIC"] && ENV["OMQ_SERVER_SECRET"])

  return unless server_key_z85 || server_mode

  crypto = CLI.load_curve_crypto(config.crypto || ENV["OMQ_CRYPTO"], verbose: config.verbose >= 1)
  require "protocol/zmtp/mechanism/curve"

  if server_key_z85
    server_key = Protocol::ZMTP::Z85.decode(server_key_z85)
    client_key = crypto::PrivateKey.generate
    sock.mechanism = Protocol::ZMTP::Mechanism::Curve.client(
      public_key: client_key.public_key.to_s,
      secret_key: client_key.to_s,
      server_key: server_key,
      crypto: crypto
    )
  elsif server_mode
    if ENV["OMQ_SERVER_PUBLIC"] && ENV["OMQ_SERVER_SECRET"]
      server_pub = Protocol::ZMTP::Z85.decode(ENV["OMQ_SERVER_PUBLIC"])
      server_sec = Protocol::ZMTP::Z85.decode(ENV["OMQ_SERVER_SECRET"])
    else
      key        = crypto::PrivateKey.generate
      server_pub = key.public_key.to_s
      server_sec = key.to_s
    end
    sock.mechanism = Protocol::ZMTP::Mechanism::Curve.server(
      public_key: server_pub,
      secret_key: server_sec,
      crypto: crypto
    )
    $stderr.puts "OMQ_SERVER_KEY='#{Protocol::ZMTP::Z85.encode(server_pub)}'"
  end
end

.setup_subscriptions(sock, config) ⇒ Object

Subscribe or join groups on sock according to config.



126
127
128
129
130
131
132
133
134
# File 'lib/omq/cli/socket_setup.rb', line 126

def self.setup_subscriptions(sock, config)
  case config.type_name
  when "sub"
    prefixes = config.subscribes.empty? ? [""] : config.subscribes
    prefixes.each { |p| sock.subscribe(p) }
  when "dish"
    config.joins.each { |g| sock.join(g) }
  end
end