Module: OMQ::CLI

Defined in:
lib/omq/cli.rb,
lib/omq/cli/pair.rb,
lib/omq/cli/pipe.rb,
lib/omq/cli/term.rb,
lib/omq/cli/config.rb,
lib/omq/cli/pub_sub.rb,
lib/omq/cli/req_rep.rb,
lib/omq/cli/version.rb,
lib/omq/cli/formatter.rb,
lib/omq/cli/push_pull.rb,
lib/omq/cli/cli_parser.rb,
lib/omq/cli/radio_dish.rb,
lib/omq/cli/base_runner.rb,
lib/omq/cli/pipe_worker.rb,
lib/omq/cli/socket_setup.rb,
lib/omq/cli/client_server.rb,
lib/omq/cli/router_dealer.rb,
lib/omq/cli/ractor_helpers.rb,
lib/omq/cli/routing_helper.rb,
lib/omq/cli/scatter_gather.rb,
lib/omq/cli/parallel_worker.rb,
lib/omq/cli/transient_monitor.rb,
lib/omq/cli/expression_evaluator.rb

Overview

Command-line interface for OMQ socket operations.

Defined Under Namespace

Modules: RactorHelpers, RoutingHelper, SocketSetup, Term Classes: BaseRunner, CliParser, Config, DishRunner, Endpoint, ExpressionEvaluator, Formatter, GatherRunner, PairRunner, ParallelWorker, PipeRunner, PipeWorker, PubRunner, PullRunner, PushRunner, RadioRunner, RepRunner, ReqRunner, RouterRunner, ScatterRunner, ServerRunner, SubRunner, TransientMonitor

Constant Summary collapse

SOCKET_TYPE_NAMES =
%w[
  req rep pub sub push pull pair dealer router
  client server radio dish scatter gather channel peer
  pipe
].freeze
RUNNER_MAP =
{
  "push"    => [PushRunner,    :PUSH],
  "pull"    => [PullRunner,    :PULL],
  "pub"     => [PubRunner,     :PUB],
  "sub"     => [SubRunner,     :SUB],
  "req"     => [ReqRunner,     :REQ],
  "rep"     => [RepRunner,     :REP],
  "dealer"  => [PairRunner,    :DEALER],
  "router"  => [RouterRunner,  :ROUTER],
  "pair"    => [PairRunner,    :PAIR],
  "client"  => [ReqRunner,     :CLIENT],
  "server"  => [ServerRunner,  :SERVER],
  "radio"   => [RadioRunner,   :RADIO],
  "dish"    => [DishRunner,    :DISH],
  "scatter" => [ScatterRunner, :SCATTER],
  "gather"  => [GatherRunner,  :GATHER],
  "channel" => [PairRunner,    :CHANNEL],
  "peer"    => [ServerRunner,  :PEER],
  "pipe"    => [PipeRunner,    nil],
}.freeze
SEND_ONLY =

Socket type names that only send messages.

%w[pub push scatter radio].freeze
RECV_ONLY =

Socket type names that only receive messages.

%w[sub pull gather dish].freeze
VERSION =
"0.15.1"

Class Method Summary collapse

Class Method Details

.build_config(argv) ⇒ Object

Builds a frozen Config from command-line arguments.



285
286
287
288
289
290
291
292
# File 'lib/omq/cli.rb', line 285

def build_config(argv)
  opts = CliParser.parse(argv)
  CliParser.validate!(opts)

  opts[:stdin_is_tty] = $stdin.tty?

  Ractor.make_shareable(Config.new(**opts))
end

.load_curve_crypto(name, verbose: false) ⇒ Module

Loads the named NaCl-compatible crypto backend.

Parameters:

  • name (String, nil)

    “rbnacl”, “nuckle”, or nil (auto-detect rbnacl)

  • verbose (Boolean) (defaults to: false)

    log which backend was loaded to stderr

Returns:

  • (Module)

    RbNaCl or Nuckle



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/omq/cli.rb', line 168

def load_curve_crypto(name, verbose: false)
  crypto = case name&.downcase
  when "rbnacl"
    require "rbnacl"
    RbNaCl
  when "nuckle"
    require "nuckle"
    Nuckle
  when nil
    begin
      require "rbnacl"
      RbNaCl
    rescue LoadError
      abort "CURVE requires libsodium. Install it:\n" \
            "  apt install libsodium-dev    # Debian/Ubuntu\n" \
            "  brew install libsodium       # macOS\n" \
            "Or use nuckle (pure Ruby, DANGEROUS -- not audited):\n" \
            "  --crypto nuckle"
    end
  else
    abort "Unknown CURVE crypto backend: #{name}. Use 'rbnacl' or 'nuckle'."
  end
  $stderr.puts "omq: CURVE crypto backend: #{crypto.name}" if verbose
  crypto
rescue LoadError
  abort "Could not load #{name} gem: gem install #{name}"
end

.page(text) ⇒ Object

Displays text through the system pager, or prints directly when stdout is not a terminal.



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/omq/cli.rb', line 94

def page(text)
  if $stdout.tty?
    if ENV["PAGER"]
      pager = ENV["PAGER"]
    else
      ENV["LESS"] ||= "-FR"
      pager = "less"
    end
    IO.popen(pager, "w") { |io| io.puts text }
  else
    puts text
  end
rescue Errno::ENOENT
  puts text
rescue Errno::EPIPE, Interrupt
  # user quit pager early (q) or hit ^C while paging
end

.run(argv = ARGV) ⇒ void

This method returns an undefined value.

Main entry point: dispatches to keygen or socket runner.

Parameters:

  • argv (Array<String>) (defaults to: ARGV)

    command-line arguments



117
118
119
120
121
122
123
124
125
# File 'lib/omq/cli.rb', line 117

def run(argv = ARGV)
  case argv.first
  when "keygen"
    argv.shift
    run_keygen(argv)
  else
    run_socket(argv)
  end
end

.run_keygen(argv) ⇒ Object

Generates a persistent CURVE keypair and prints it as Z85-encoded env vars.



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/omq/cli.rb', line 131

def run_keygen(argv)
  crypto_name = nil
  verbose     = false

  while (arg = argv.shift)
    case arg
    when "--crypto"
      crypto_name = argv.shift
    when "-v", "--verbose"
      verbose = true
    when "-h", "--help"
      puts "Usage: omq keygen [--crypto rbnacl|nuckle] [-v]\n\n" \
           "Generates a CURVE keypair for persistent server identity.\n" \
           "Output: Z85-encoded env vars for use with --curve-server."
      exit
    else
      abort "omq keygen: unknown option: #{arg}"
    end
  end
  crypto_name ||= ENV["OMQ_CRYPTO"]

  crypto = load_curve_crypto(crypto_name, verbose: verbose)
  require "protocol/zmtp/mechanism/curve"
  require "protocol/zmtp/z85"

  key = crypto::PrivateKey.generate
  puts "OMQ_SERVER_PUBLIC='#{Protocol::ZMTP::Z85.encode(key.public_key.to_s)}'"
  puts "OMQ_SERVER_SECRET='#{Protocol::ZMTP::Z85.encode(key.to_s)}'"
end

.run_socket(argv) ⇒ Object

Parses CLI arguments, validates options, and runs the main event loop inside an Async reactor.



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/omq/cli.rb', line 200

def run_socket(argv)
  config = build_config(argv)

  require "omq"
  require "omq/rfc/clientserver"
  require "omq/rfc/radiodish"
  require "omq/rfc/scattergather"
  require "omq/rfc/channel"
  require "omq/rfc/p2p"
  require "omq/rfc/zstd"
  require "async"
  require "json"
  require "console"

  CliParser.validate_gems!(config)
  trap("INT")  { Process.exit!(0) }
  trap("TERM") { Process.exit!(0) }

  Console.logger = Console::Logger.new(Console::Output::Null.new) unless config.verbose >= 1

  debug_ep = nil

  if ENV["OMQ_DEBUG_URI"]
    begin
      require "async/debug"
      debug_ep = Async::HTTP::Endpoint.parse(ENV["OMQ_DEBUG_URI"])
      if debug_ep.scheme == "https"
        require "localhost"
        debug_ep = Async::HTTP::Endpoint.parse(ENV["OMQ_DEBUG_URI"],
          ssl_context: Localhost::Authority.fetch.server_context)
      end
    rescue LoadError
      abort "OMQ_DEBUG_URI requires the async-debug gem: gem install async-debug"
    end
  end

  if config.type_name.nil?
    Process.setproctitle("omq script")
    Object.include(OMQ) unless Object.include?(OMQ)
    Async annotation: 'omq' do
      Async::Debug.serve(endpoint: debug_ep) if debug_ep
      config.scripts.each { |s| load_script(s) }
    rescue => e
      $stderr.puts "omq: #{e.message}"
      exit 1
    end
    return
  end

  runner_class, socket_sym = RUNNER_MAP.fetch(config.type_name)

  Async annotation: "omq #{config.type_name}" do |task|
    Async::Debug.serve(endpoint: debug_ep) if debug_ep
    config.scripts.each { |s| load_script(s) }
    runner = if socket_sym
               runner_class.new(config, OMQ.const_get(socket_sym))
             else
               runner_class.new(config)
             end
    runner.call(task)
  rescue IO::TimeoutError, Async::TimeoutError
    $stderr.puts "omq: timeout" unless config.quiet
    exit 2
  rescue OMQ::SocketDeadError => e
    $stderr.puts "omq: #{e.cause.class}: #{e.cause.message}"
    exit 1
  rescue ::Socket::ResolutionError => e
    $stderr.puts "omq: #{e.message}"
    exit 1
  end
end