Module: OMQ::CLI

Defined in:
lib/omq/cli.rb,
lib/omq/cli/pair.rb,
lib/omq/cli/pipe.rb,
lib/omq/cli/term.rb,
lib/omq/cli/config.rb,
lib/omq/cli/pub_sub.rb,
lib/omq/cli/req_rep.rb,
lib/omq/cli/version.rb,
lib/omq/cli/formatter.rb,
lib/omq/cli/push_pull.rb,
lib/omq/cli/cli_parser.rb,
lib/omq/cli/radio_dish.rb,
lib/omq/cli/base_runner.rb,
lib/omq/cli/pipe_worker.rb,
lib/omq/cli/socket_setup.rb,
lib/omq/cli/client_server.rb,
lib/omq/cli/router_dealer.rb,
lib/omq/cli/ractor_helpers.rb,
lib/omq/cli/routing_helper.rb,
lib/omq/cli/scatter_gather.rb,
lib/omq/cli/parallel_worker.rb,
lib/omq/cli/transient_monitor.rb,
lib/omq/cli/expression_evaluator.rb

Overview

Command-line interface for OMQ socket operations.

Defined Under Namespace

Modules: RactorHelpers, RoutingHelper, SocketSetup, Term Classes: BaseRunner, CliParser, Config, DishRunner, Endpoint, ExpressionEvaluator, Formatter, GatherRunner, PairRunner, ParallelWorker, PipeRunner, PipeWorker, PubRunner, PullRunner, PushRunner, RadioRunner, RepRunner, ReqRunner, RouterRunner, ScatterRunner, ServerRunner, SubRunner, TransientMonitor

Constant Summary collapse

SOCKET_TYPE_NAMES =
%w[
  req rep pub sub push pull pair dealer router
  client server radio dish scatter gather channel peer
  pipe
].freeze
RUNNER_MAP =
{
  "push"    => [PushRunner,    :PUSH],
  "pull"    => [PullRunner,    :PULL],
  "pub"     => [PubRunner,     :PUB],
  "sub"     => [SubRunner,     :SUB],
  "req"     => [ReqRunner,     :REQ],
  "rep"     => [RepRunner,     :REP],
  "dealer"  => [PairRunner,    :DEALER],
  "router"  => [RouterRunner,  :ROUTER],
  "pair"    => [PairRunner,    :PAIR],
  "client"  => [ReqRunner,     :CLIENT],
  "server"  => [ServerRunner,  :SERVER],
  "radio"   => [RadioRunner,   :RADIO],
  "dish"    => [DishRunner,    :DISH],
  "scatter" => [ScatterRunner, :SCATTER],
  "gather"  => [GatherRunner,  :GATHER],
  "channel" => [PairRunner,    :CHANNEL],
  "peer"    => [ServerRunner,  :PEER],
  "pipe"    => [PipeRunner,    nil],
}.freeze
SEND_ONLY =

Socket type names that only send messages.

%w[pub push scatter radio].freeze
RECV_ONLY =

Socket type names that only receive messages.

%w[sub pull gather dish].freeze
VERSION =
"0.14.9"

Class Method Summary collapse

Class Method Details

.build_config(argv) ⇒ Object

Builds a frozen Config from command-line arguments.



276
277
278
279
280
281
282
283
# File 'lib/omq/cli.rb', line 276

def build_config(argv)
  opts = CliParser.parse(argv)
  CliParser.validate!(opts)

  opts[:stdin_is_tty] = $stdin.tty?

  Ractor.make_shareable(Config.new(**opts))
end

.load_curve_crypto(name, verbose: false) ⇒ Module

Loads the named NaCl-compatible crypto backend.

Parameters:

  • name (String, nil)

    “rbnacl”, “nuckle”, or nil (auto-detect rbnacl)

  • verbose (Boolean) (defaults to: false)

    log which backend was loaded to stderr

Returns:

  • (Module)

    RbNaCl or Nuckle



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/omq/cli.rb', line 159

def load_curve_crypto(name, verbose: false)
  crypto = case name&.downcase
  when "rbnacl"
    require "rbnacl"
    RbNaCl
  when "nuckle"
    require "nuckle"
    Nuckle
  when nil
    begin
      require "rbnacl"
      RbNaCl
    rescue LoadError
      abort "CURVE requires libsodium. Install it:\n" \
            "  apt install libsodium-dev    # Debian/Ubuntu\n" \
            "  brew install libsodium       # macOS\n" \
            "Or use nuckle (pure Ruby, DANGEROUS -- not audited):\n" \
            "  --crypto nuckle"
    end
  else
    abort "Unknown CURVE crypto backend: #{name}. Use 'rbnacl' or 'nuckle'."
  end
  $stderr.puts "omq: CURVE crypto backend: #{crypto.name}" if verbose
  crypto
rescue LoadError
  abort "Could not load #{name} gem: gem install #{name}"
end

.page(text) ⇒ Object

Displays text through the system pager, or prints directly when stdout is not a terminal.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/omq/cli.rb', line 86

def page(text)
  if $stdout.tty?
    if ENV["PAGER"]
      pager = ENV["PAGER"]
    else
      ENV["LESS"] ||= "-FR"
      pager = "less"
    end
    IO.popen(pager, "w") { |io| io.puts text }
  else
    puts text
  end
rescue Errno::ENOENT
  puts text
rescue Errno::EPIPE, Interrupt
  # user quit pager early (q) or hit ^C while paging
end

.run(argv = ARGV) ⇒ void

This method returns an undefined value.

Main entry point: dispatches to keygen or socket runner.

Parameters:

  • argv (Array<String>) (defaults to: ARGV)

    command-line arguments



109
110
111
112
113
114
115
116
117
# File 'lib/omq/cli.rb', line 109

def run(argv = ARGV)
  case argv.first
  when "keygen"
    argv.shift
    run_keygen(argv)
  else
    run_socket(argv)
  end
end

.run_keygen(argv) ⇒ Object

Generates a persistent CURVE keypair and prints it as Z85-encoded env vars.



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/omq/cli.rb', line 123

def run_keygen(argv)
  crypto_name = nil
  verbose     = false
  while (arg = argv.shift)
    case arg
    when "--crypto"
      crypto_name = argv.shift
    when "-v", "--verbose"
      verbose = true
    when "-h", "--help"
      puts "Usage: omq keygen [--crypto rbnacl|nuckle] [-v]\n\n" \
           "Generates a CURVE keypair for persistent server identity.\n" \
           "Output: Z85-encoded env vars for use with --curve-server."
      exit
    else
      abort "omq keygen: unknown option: #{arg}"
    end
  end
  crypto_name ||= ENV["OMQ_CRYPTO"]

  crypto = load_curve_crypto(crypto_name, verbose: verbose)
  require "protocol/zmtp/mechanism/curve"
  require "protocol/zmtp/z85"

  key = crypto::PrivateKey.generate
  puts "OMQ_SERVER_PUBLIC='#{Protocol::ZMTP::Z85.encode(key.public_key.to_s)}'"
  puts "OMQ_SERVER_SECRET='#{Protocol::ZMTP::Z85.encode(key.to_s)}'"
end

.run_socket(argv) ⇒ Object

Parses CLI arguments, validates options, and runs the main event loop inside an Async reactor.



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/omq/cli.rb', line 191

def run_socket(argv)
  config = build_config(argv)

  require "omq"
  require "omq/rfc/clientserver"
  require "omq/rfc/radiodish"
  require "omq/rfc/scattergather"
  require "omq/rfc/channel"
  require "omq/rfc/p2p"
  require "omq/rfc/zstd"
  require "async"
  require "json"
  require "console"

  CliParser.validate_gems!(config)
  trap("INT")  { Process.exit!(0) }
  trap("TERM") { Process.exit!(0) }

  Console.logger = Console::Logger.new(Console::Output::Null.new) unless config.verbose >= 1

  debug_ep = nil

  if ENV["OMQ_DEBUG_URI"]
    begin
      require "async/debug"
      debug_ep = Async::HTTP::Endpoint.parse(ENV["OMQ_DEBUG_URI"])
      if debug_ep.scheme == "https"
        require "localhost"
        debug_ep = Async::HTTP::Endpoint.parse(ENV["OMQ_DEBUG_URI"],
          ssl_context: Localhost::Authority.fetch.server_context)
      end
    rescue LoadError
      abort "OMQ_DEBUG_URI requires the async-debug gem: gem install async-debug"
    end
  end

  if config.type_name.nil?
    Process.setproctitle("omq script")
    Object.include(OMQ) unless Object.include?(OMQ)
    Async annotation: 'omq' do
      Async::Debug.serve(endpoint: debug_ep) if debug_ep
      config.scripts.each { |s| load_script(s) }
    rescue => e
      $stderr.puts "omq: #{e.message}"
      exit 1
    end
    return
  end

  runner_class, socket_sym = RUNNER_MAP.fetch(config.type_name)

  Async annotation: "omq #{config.type_name}" do |task|
    Async::Debug.serve(endpoint: debug_ep) if debug_ep
    config.scripts.each { |s| load_script(s) }
    runner = if socket_sym
               runner_class.new(config, OMQ.const_get(socket_sym))
             else
               runner_class.new(config)
             end
    runner.call(task)
  rescue IO::TimeoutError, Async::TimeoutError
    $stderr.puts "omq: timeout" unless config.quiet
    exit 2
  rescue OMQ::SocketDeadError => e
    $stderr.puts "omq: #{e.cause.class}: #{e.cause.message}"
    exit 1
  rescue ::Socket::ResolutionError => e
    $stderr.puts "omq: #{e.message}"
    exit 1
  end
end