Class: OMQ::CLI::CliParser

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/cli/cli_parser.rb

Overview

Parses and validates command-line arguments for the omq CLI.

Constant Summary collapse

EXAMPLES =
<<~'TEXT'
  -- Request / Reply ------------------------------------------

    +-----+   "hello"    +-----+
    | REQ |------------->| REP |
    |     |<-------------|     |
    +-----+   "HELLO"    +-----+

    # terminal 1: echo server
    omq rep --bind tcp://:5555 --recv-eval 'it.map(&:upcase)'

    # terminal 2: send a request
    echo "hello" | omq req --connect tcp://localhost:5555

    # or over IPC (unix socket, single machine)
    omq rep --bind ipc:///tmp/echo.sock --echo &
    echo "hello" | omq req --connect ipc:///tmp/echo.sock

  -- Publish / Subscribe --------------------------------------

    +-----+  "weather.nyc 72F"  +-----+
    | PUB |-------------------->| SUB | --subscribe "weather."
    +-----+                     +-----+

    # terminal 1: subscriber (all topics by default)
    omq sub --bind tcp://:5556

    # terminal 2: publisher (needs --delay for subscription to propagate)
    echo "weather.nyc 72F" | omq pub --connect tcp://localhost:5556 --delay 1

  -- Periodic Publish -------------------------------------------

    +-----+   "tick 1"    +-----+
    | PUB |--(every 1s)-->| SUB |
    +-----+               +-----+

    # terminal 1: subscriber
    omq sub --bind tcp://:5556

    # terminal 2: publish a tick every second (wall-clock aligned)
    omq pub --connect tcp://localhost:5556 --delay 1 --data "tick" --interval 1

    # 5 ticks, then exit
    omq pub --connect tcp://localhost:5556 -d1 -D "tick" -i0.5 --count 5

  -- Pipeline -------------------------------------------------

    +------+           +------+
    | PUSH |---------->| PULL |
    +------+           +------+

    # terminal 1: worker
    omq pull --bind tcp://:5557

    # terminal 2: send tasks
    echo "task 1" | omq push --connect tcp://localhost:5557

    # or over IPC (unix socket)
    omq pull --bind ipc:///tmp/pipeline.sock &
    echo "task 1" | omq push --connect ipc:///tmp/pipeline.sock

  -- Pipe (PULL -> eval -> PUSH) --------------------------------

    +------+         +------+         +------+
    | PUSH |-------->| pipe |-------->| PULL |
    +------+         +------+         +------+

    # @work / @sink below are Linux abstract-namespace unix
    # sockets (ipc://@name) -- no path on disk, cleaned up
    # automatically. Linux only. Use ipc:///tmp/work etc.
    # on macOS/BSD.

    # terminal 1: producer
    echo -e "hello\nworld" | omq push -b@work

    # terminal 2: worker (uppercase each message)
    omq pipe -c@work -c@sink -e 'it.map(&:upcase)'
    # terminal 3: collector
    omq pull -b@sink

    # 4 Ractor workers in a single process (-P)
    omq pipe -c@work -c@sink -P4 -r./fib -e 'fib(it.first.to_i).to_s'

    # exit when producer disconnects (--transient)
    omq pipe -c@work -c@sink --transient -e 'it.map(&:upcase)'

    # fan-in (multiple sources -> one sink)
    omq pipe --in -c@work1 -c@work2 --out -c@sink -e 'it.map(&:upcase)'

    # fan-out (one source -> multiple sinks, round-robin)
    omq pipe --in -b tcp://:5555 --out -c@sink1 -c@sink2 -e 'it'

  -- CLIENT / SERVER (draft) ----------------------------------

    +--------+   "hello"   +--------+
    | CLIENT |------------>| SERVER | --recv-eval 'it.map(&:upcase)'
    |        |<------------|        |
    +--------+   "HELLO"   +--------+

    # terminal 1: upcasing server
    omq server --bind tcp://:5555 --recv-eval 'it.map(&:upcase)'

    # terminal 2: client
    echo "hello" | omq client --connect tcp://localhost:5555

  -- Formats --------------------------------------------------

    # ascii (default) -- non-printable replaced with dots
    omq pull --bind tcp://:5557 --ascii

    # quoted -- lossless, round-trippable (uses String#dump escaping)
    omq pull --bind tcp://:5557 --quoted

    # JSON Lines -- structured, multipart as arrays
    echo '["key","value"]' | omq push --connect tcp://localhost:5557 --jsonl
    omq pull --bind tcp://:5557 --jsonl

    # multipart via tabs
    printf "routing-key\tpayload" | omq push --connect tcp://localhost:5557

  -- Marshal (arbitrary Ruby objects) -------------------------

    # -M: each message is one Marshal-dumped Ruby object.
    # Inside -e/-E, `it` is the raw object (not an Array).

    # send a bare string; receiver transforms it to { string => encoding } hash
    omq push -b tcp://:5557 -ME '"foo" * 3'
    omq pull -c tcp://:5557 -Me '{it => it.encoding}'
    # output: {"foofoofoo" => #<Encoding:UTF-8>}

    # -vvv traces render the app object, not wire bytes
    omq push -b tcp://:5557 -ME '{now: Time.now, pid: Process.pid}' -vvv
    # >> (marshal) {now: 2026-04-13 ..., pid: 12345}

  -- Compression ----------------------------------------------

    # -z / -Z / --lz4 rewrite tcp:// endpoints to zstd+tcp:// or
    # lz4+tcp:// (dedicated compressed transports from omq-zstd
    # and omq-lz4). Both peers must agree on the codec; non-TCP
    # endpoints are unaffected.
    omq pull --bind tcp://:5557 -z &
    echo "compressible data" | omq push --connect tcp://localhost:5557 -z

    # LZ4: no entropy stage, ~4–8× faster encode than zstd,
    # ~3× less memory. Pick it when CPU or RAM is tighter than
    # bandwidth.
    omq pull --bind tcp://:5557 --lz4 &
    echo "hello" | omq push --connect tcp://localhost:5557 --lz4

    # Explicit spec form (same for -e, -f, or --compress):
    omq pull -b tcp://:5557 --compress=zstd:3
    omq pull -b tcp://:5557 --compress=lz4

    # Pipe with asymmetric codecs (decompresses lz4 in, re-emits zstd out):
    omq pipe --in --lz4 -c tcp://src:5555 --out -z -c tcp://dst:6666

  -- CURVE Encryption -----------------------------------------

    # server (prints OMQ_SERVER_KEY=...)
    omq rep --bind tcp://:5555 --echo --curve-server

    # client (paste the server's key)
    echo "secret" | omq req --connect tcp://localhost:5555 \
      --curve-server-key '<key from server>'

  -- ROUTER / DEALER ------------------------------------------

    +--------+          +--------+
    | DEALER |--------->| ROUTER |
    | id=w1  |          |        |
    +--------+          +--------+

    # terminal 1: router shows identity + message
    omq router --bind tcp://:5555

    # terminal 2: dealer with identity
    echo "hello" | omq dealer --connect tcp://localhost:5555 --identity worker-1

  -- Ruby Eval ------------------------------------------------

    # filter incoming: only pass messages containing "error"
    omq pull -b tcp://:5557 --recv-eval 'it.first.include?("error") ? it : nil'

    # transform incoming with gems
    omq sub -c tcp://localhost:5556 -rjson -e 'JSON.parse(it.first)["temperature"]'

    # require a local file, use its methods
    omq rep --bind tcp://:5555 --require ./transform.rb -e 'upcase_all(it)'

    # next skips, break stops
    omq pull -b tcp://:5557 -e 'next if it.first =~ /^#/; break if it.first =~ /quit/; it'

    # BEGIN/END blocks (like awk) -- accumulate and summarize
    omq pull -b tcp://:5557 -e 'BEGIN{@sum = 0} @sum += it.first.to_i; nil END{puts @sum}'

    # transform outgoing messages
    echo hello | omq push -c tcp://localhost:5557 --send-eval 'it.map(&:upcase)'

    # REQ: transform request and reply independently
    echo hello | omq req -c tcp://localhost:5555 -E 'it.map(&:upcase)' -e 'it.first'

    # block parameter: single param receives parts array
    omq pull -b tcp://:5557 -e '|msg| msg.map(&:upcase)'

    # destructure multipart messages with parens
    omq pull -b tcp://:5557 -e '|(key, value)| "#{key}=#{value}"'

  -- Script Handlers (-r) ------------------------------------

    # handler.rb -- register transforms from a file
    #   db = PG.connect("dbname=app")
    #   OMQ.incoming { |first_part, _| db.exec(first_part).values.flatten }
    #   at_exit { db.close }
    omq pull --bind tcp://:5557 -r./handler.rb

    # combine script handlers with inline eval
    omq req -c tcp://localhost:5555 -r./handler.rb -E 'it.map(&:upcase)'

    # OMQ.outgoing { |msg| ... }   -- registered outgoing transform
    # OMQ.incoming { |msg| ... }   -- registered incoming transform
    # CLI flags (-e/-E) override registered handlers
TEXT
DEFAULT_OPTS =
{
  type_name:        nil,
  endpoints:        [],
  connects:         [],
  binds:            [],
  in_endpoints:     [],
  out_endpoints:    [],
  data:             nil,
  file:             nil,
  format:           :ascii,
  subscribes:       [],
  joins:            [],
  group:            nil,
  identity:         nil,
  target:           nil,
  interval:         nil,
  count:            nil,
  delay:            nil,
  timeout:          nil,
  linger:           5,
  reconnect_ivl:    nil,
  heartbeat_ivl:    nil,
  send_hwm:         nil,
  recv_hwm:         nil,
  sndbuf:           nil,
  rcvbuf:           nil,
  conflate:         false,
  compress:         nil,   # nil | :zstd | :lz4
  compress_level:   nil,   # Integer (zstd only)
  in_compress:        nil,
  in_compress_level:  nil,
  out_compress:       nil,
  out_compress_level: nil,
  send_expr:        nil,
  recv_expr:        nil,
  parallel:         nil,
  transient:        false,
  verbose:          0,
  timestamps:       nil,
  quiet:            false,
  echo:             false,
  scripts:          [],
  recv_maxsz:       nil,
  curve_server:     false,
  curve_server_key: nil,
  crypto:     nil,
  ffi:              false,
}.freeze

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.parse(argv) ⇒ Object

Parses argv and returns a mutable options hash.



287
288
289
# File 'lib/omq/cli/cli_parser.rb', line 287

def self.parse(argv)
  new.parse(argv)
end

.validate!(opts) ⇒ Object

Validates option combinations, aborting on bad combos.



313
314
315
# File 'lib/omq/cli/cli_parser.rb', line 313

def self.validate!(opts)
  new.validate!(opts)
end

.validate_gems!(config) ⇒ Object

Validates option combinations that depend on socket type.



320
321
322
323
324
# File 'lib/omq/cli/cli_parser.rb', line 320

def self.validate_gems!(config)
  if config.recv_only? && (config.data || config.file)
    abort "--data/--file not valid for #{config.type_name} (receive-only)"
  end
end

Instance Method Details

#expand_endpoint(url) ⇒ Object

Expands shorthand ‘@name` to `ipc://@name` (Linux abstract namespace). Only triggers when the value starts with `@` and has no `://` scheme.



644
645
646
# File 'lib/omq/cli/cli_parser.rb', line 644

def expand_endpoint(url)
  url.start_with?("@") && !url.include?("://") ? "ipc://#{url}" : url
end

#parse(argv) ⇒ Hash

Parses argv and returns a mutable options hash.

Parameters:

  • argv (Array<String>)

    command-line arguments (mutated in place)

Returns:

  • (Hash)

    parsed options



331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
# File 'lib/omq/cli/cli_parser.rb', line 331

def parse(argv)
  opts      = DEFAULT_OPTS.transform_values { |v| v.is_a?(Array) ? v.dup : v }
  pipe_side = nil  # nil = legacy positional mode; :in/:out = modal

  parser = OptionParser.new do |o|
    o.banner = "Usage: omq TYPE [options]\n\n" \
               "Types:    req, rep, pub, sub, push, pull, pair, dealer, router\n" \
               "Draft:    client, server, radio, dish, scatter, gather, channel, peer\n" \
               "Virtual:  pipe (PULL -> eval -> PUSH)\n\n"

    o.separator "Connection:"
    o.on("-c", "--connect URL", "Connect to endpoint (repeatable)") { |v|
      v = expand_endpoint(v)
      ep = Endpoint.new(v, false)
      case pipe_side
      when :in
        opts[:in_endpoints] << ep
      when :out
        opts[:out_endpoints] << ep
      else
        opts[:endpoints] << ep
        opts[:connects]  << v
      end
    }
    o.on("-b", "--bind URL", "Bind to endpoint (repeatable)") { |v|
      v = expand_endpoint(v)
      ep = Endpoint.new(v, true)
      case pipe_side
      when :in
        opts[:in_endpoints] << ep
      when :out
        opts[:out_endpoints] << ep
      else
        opts[:endpoints] << ep
        opts[:binds]     << v
      end
    }
    o.on("--in",  "Pipe: subsequent -b/-c attach to input (PULL) side")  { pipe_side = :in }
    o.on("--out", "Pipe: subsequent -b/-c attach to output (PUSH) side") { pipe_side = :out }

    o.separator "\nData source (REP: reply source):"
    o.on(      "--echo",        "Echo received messages back (REP)")   { opts[:echo] = true }
    o.on("-D", "--data DATA",   "Message data (literal string)")      { |v| opts[:data] = v }
    o.on("-F", "--file FILE",   "Read message from file (- = stdin)") { |v| opts[:file] = v }

    o.separator "\nFormat (input + output):"
    o.on("-A", "--ascii",   "Tab-separated frames, safe ASCII (default)") { opts[:format] = :ascii }
    o.on("-Q", "--quoted",  "C-style quoted with escapes")                { opts[:format] = :quoted }
    o.on(      "--raw",     "Raw binary, no framing")                     { opts[:format] = :raw }
    o.on("-J", "--jsonl",   "JSON Lines (array of strings per line)")     { opts[:format] = :jsonl }
    o.on(      "--msgpack",  "MessagePack arrays (binary stream)")         { require "msgpack"; opts[:format] = :msgpack }
    o.on("-M", "--marshal", "Ruby Marshal stream (one arbitrary object per message)") { opts[:format] = :marshal }

    o.separator "\nSubscription/groups:"
    o.on("-s", "--subscribe PREFIX", "Subscribe prefix (SUB, default all)")     { |v| opts[:subscribes] << v }
    o.on("-j", "--join GROUP",       "Join group (repeatable, DISH only)")      { |v| opts[:joins] << v }
    o.on("-g", "--group GROUP",      "Publish group (RADIO only)")              { |v| opts[:group] = v }

    o.separator "\nIdentity/routing:"
    o.on("--identity ID", "Set socket identity (DEALER/ROUTER)")                     { |v| opts[:identity] = v }
    o.on("--target ID",   "Target peer (ROUTER/SERVER/PEER, 0x prefix for binary)")  { |v| opts[:target] = v }

    o.separator "\nTiming:"
    o.on("-i", "--interval SECS", Float,   "Repeat interval")                   { |v| opts[:interval] = v }
    o.on("-n", "--count COUNT",   Integer,  "Max iterations (0=inf)")            { |v| opts[:count] = v }
    o.on("-d", "--delay SECS",    Float,   "Delay before first send")            { |v| opts[:delay] = v }
    o.on("-t", "--timeout SECS",  Float,   "Send/receive timeout")               { |v| opts[:timeout] = v }
    o.on("-l", "--linger SECS",   Float,   "Drain time on close (default 5)")   { |v| opts[:linger] = v }
    o.on("--reconnect-ivl IVL", "Reconnect interval: SECS or MIN..MAX (default 0.1)") { |v|
      opts[:reconnect_ivl] = if v.include?("..")
                               lo, hi = v.split("..", 2)
                               Float(lo)..Float(hi)
                             else
                               Float(v)
                             end
    }
    o.on("--heartbeat-ivl SECS", Float, "ZMTP heartbeat interval (detects dead peers)") { |v| opts[:heartbeat_ivl] = v }
    o.on("--recv-maxsz SIZE", "Max inbound message size, e.g. 4096, 64K, 1M, 2G (default 1M, 0=unlimited; larger messages drop the connection)") { |v| opts[:recv_maxsz] = parse_byte_size(v) }
    o.on("--hwm N", Integer, "High water mark (default 64, 0=unbounded; modal with --in/--out)") do |v|
      case pipe_side
      when :in
        opts[:recv_hwm] = v
      when :out
        opts[:send_hwm] = v
      else
        opts[:send_hwm] = v
        opts[:recv_hwm] = v
      end
    end
    o.on("--sndbuf N", "SO_SNDBUF kernel buffer size (e.g. 4K, 1M)") { |v| opts[:sndbuf] = parse_byte_size(v) }
    o.on("--rcvbuf N", "SO_RCVBUF kernel buffer size (e.g. 4K, 1M)") { |v| opts[:rcvbuf] = parse_byte_size(v) }

    o.separator "\nDelivery:"
    o.on("--conflate", "Keep only last message per subscriber (PUB/RADIO)") { opts[:conflate] = true }

    o.separator "\nCompression (modal with --in/--out for pipe):"
    set_compress = lambda do |codec, level|
      case pipe_side
      when :in
        opts[:in_compress] = codec
        opts[:in_compress_level] = level
      when :out
        opts[:out_compress] = codec
        opts[:out_compress_level] = level
      else
        opts[:compress] = codec
        opts[:compress_level] = level
      end
    end
    o.on("-z",    "Zstd compression (level -3, fast)")        { set_compress.call(:zstd, -3) }
    o.on("-Z",    "Zstd compression (level 3, better ratio)") { set_compress.call(:zstd, 3) }
    o.on("--lz4", "LZ4 compression (FAST)")                   { set_compress.call(:lz4, nil) }
    o.on("--compress=SPEC", "Explicit codec[:level], e.g. zstd, zstd:3, zstd:-1, lz4") do |v|
      codec, level = parse_compress_spec(v)
      set_compress.call(codec, level)
    end

    o.separator "\nProcessing (-e = incoming, -E = outgoing):"
    o.on("-e", "--recv-eval EXPR", "Eval Ruby for each incoming message (it = parts, or |a, b|)") { |v| opts[:recv_expr] = v }
    o.on("-E", "--send-eval EXPR", "Eval Ruby for each outgoing message (it = parts, or |a, b|)") { |v| opts[:send_expr] = v }
    o.on("-r", "--require LIB",  "Require lib/file in Async context; use '-' for stdin. Scripts can register OMQ.outgoing/incoming") { |v|
      require "omq" unless defined?(OMQ::VERSION)
      opts[:scripts] << (v == "-" ? :stdin : (v.start_with?("./", "../") ? File.expand_path(v) : v))
    }
    o.on("-P", "--parallel [N]", Integer, "Parallel Ractor workers (0 = nproc, max 16)") { |v|
      n = v.nil? || v.zero? ? Etc.nprocessors : v
      opts[:parallel] = [n, 16].min
    }

    o.separator "\nCURVE encryption (requires system libsodium):"
    o.on("--curve-server",         "Enable CURVE as server (generates keypair)") { opts[:curve_server] = true }
    o.on("--curve-server-key KEY", "Enable CURVE as client (server's Z85 public key)") { |v| opts[:curve_server_key] = v }
    o.on("--crypto BACKEND", "Crypto backend: rbnacl (default) or nuckle (pure Ruby, DANGEROUS)") { |v| opts[:crypto] = v }
    o.separator "  Install libsodium: apt install libsodium-dev / brew install libsodium"
    o.separator "  Env vars: OMQ_SERVER_KEY (client), OMQ_SERVER_PUBLIC + OMQ_SERVER_SECRET (server)"
    o.separator "            OMQ_CRYPTO (backend: rbnacl or nuckle)"

    o.separator "\nOther:"
    o.on("-v", "--verbose",   "Verbosity: -v endpoints, -vv events, -vvv messages") { opts[:verbose] += 1 }
    o.on(      "--timestamps PRECISION", %w[s ms us], "Prefix log lines with UTC timestamp (s/ms/us, default ms)") { |v|
      opts[:timestamps] = v.to_sym
    }
    o.on("-q", "--quiet",     "Suppress message output")           { opts[:quiet] = true }
    o.on(      "--transient", "Exit when all peers disconnect")    { opts[:transient] = true }
    o.on(      "--ffi",       "Use libzmq FFI backend (requires omq-ffi gem + system libzmq 4.x)") do
      begin
        require "omq/ffi"
      rescue LoadError => e
        abort "omq: --ffi requires the omq-ffi gem and system libzmq 4.x (#{e.message})"
      end
      opts[:ffi] = true
    end
    o.on("-V", "--version") {
      if ENV["OMQ_DEV"]
        require_relative "../../../../omq/lib/omq/version"
      else
        require "omq/version"
      end
      puts "omq-cli #{OMQ::CLI::VERSION} (omq #{OMQ::VERSION})"
      exit
    }
    o.on("-h")             { puts o
                             exit }
    o.on("--help")        { CLI.page "#{o}\n#{EXAMPLES}"
                             exit }
    o.on("--examples")    { CLI.page EXAMPLES
                             exit }

    o.separator "\nExit codes: 0 = success, 1 = error, 2 = timeout"
  end

  argv = split_parallel_cluster(argv)

  begin
    parser.parse!(argv)
  rescue OptionParser::ParseError => e
    abort e.message
  end

  type_name = argv.shift
  if type_name.nil?
    abort parser.to_s if opts[:scripts].empty?
    # bare script mode -- type_name stays nil
  elsif !SOCKET_TYPE_NAMES.include?(type_name.downcase)
    abort "Unknown socket type: #{type_name}. Known: #{SOCKET_TYPE_NAMES.join(', ')}"
  else
    opts[:type_name] = type_name.downcase
  end

  # Host shorthand (tcp://*:PORT, tcp://:PORT, tcp://localhost:PORT)
  # is normalized inside OMQ::Transport::TCP — see its
  # #normalize_bind_host / #normalize_connect_host / #loopback_host.

  opts
end

#parse_byte_size(str) ⇒ Integer

Parses a byte size string with an optional K/M/G suffix (binary, i.e. 1K = 1024 bytes).

Parameters:

  • str (String)

    e.g. “4096”, “4K”, “1M”, “2G”

Returns:

  • (Integer)

    size in bytes



558
559
560
561
562
563
564
565
566
567
# File 'lib/omq/cli/cli_parser.rb', line 558

def parse_byte_size(str)
  case str
  when /\A(\d+)[kK]\z/ then $1.to_i * 1024
  when /\A(\d+)[mM]\z/ then $1.to_i * 1024 * 1024
  when /\A(\d+)[gG]\z/ then $1.to_i * 1024 * 1024 * 1024
  when /\A\d+\z/       then str.to_i
  else
    abort "invalid byte size: #{str} (use e.g. 4096, 4K, 1M, 2G)"
  end
end

#parse_compress_spec(str) ⇒ Object

Parses a –compress SPEC string.

Accepted forms:

"zstd"         → [:zstd, nil]  (SocketSetup picks the default)
"zstd:3"       → [:zstd, 3]
"zstd:-1"      → [:zstd, -1]
"lz4"          → [:lz4, nil]
"-1" | "19"    → [:zstd, N]    (back-compat with 0.16.x --compress=LEVEL)

Aborts on anything else (including “lz4:N”, since the lz4+tcp transport has no compression-level knob).



540
541
542
543
544
545
546
547
548
549
# File 'lib/omq/cli/cli_parser.rb', line 540

def parse_compress_spec(str)
  case str
  when /\Azstd\z/            then [:zstd, nil]
  when /\Azstd:(-?\d+)\z/    then [:zstd, $1.to_i]
  when /\Alz4\z/             then [:lz4, nil]
  when /\A-?\d+\z/           then [:zstd, str.to_i]
  else
    abort "invalid --compress spec: #{str} (use zstd, zstd:LEVEL, or lz4)"
  end
end

#split_parallel_cluster(argv) ⇒ Object

Splits short-option clusters of the form ‘-P[letters]` so OptionParser sees `-P` followed by `-[letters]`. Lets `-P0zvv` mean `-P0 -z -v -v` (portable & combinable). Also rewrites bare `–timestamps` to `–timestamps=ms` so OptionParser doesn’t consume the next positional token as its argument.



299
300
301
302
303
304
305
306
307
308
# File 'lib/omq/cli/cli_parser.rb', line 299

def split_parallel_cluster(argv)
  argv.flat_map { |a|
    if a =~ /\A-P(\d*)([a-zA-Z].*)\z/
      n, rest = $1, $2
      n.empty? ? ["-P", "-#{rest}"] : ["-P#{n}", "-#{rest}"]
    else
      a
    end
  }.map { |a| a == "--timestamps" ? "--timestamps=ms" : a }
end

#validate!(opts) ⇒ void

This method returns an undefined value.

Validates option combinations, aborting on invalid combos.

Parameters:

  • opts (Hash)

    parsed options from #parse



574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
# File 'lib/omq/cli/cli_parser.rb', line 574

def validate!(opts)
  return if opts[:type_name].nil?  # bare script mode

  abort "-r- (stdin script) and -F- (stdin data) cannot both be used" if opts[:scripts]&.include?(:stdin) && opts[:file] == "-"

  type_name = opts[:type_name]

  if type_name == "pipe"
    has_in_out = opts[:in_endpoints].any? || opts[:out_endpoints].any?
    if has_in_out
      # Promote bare endpoints into the missing side:
      # `pipe -c SRC --out -c DST` → bare SRC becomes --in
      if opts[:in_endpoints].empty? && opts[:endpoints].any?
        opts[:in_endpoints] = opts[:endpoints]
        opts[:endpoints]    = []
      elsif opts[:out_endpoints].empty? && opts[:endpoints].any?
        opts[:out_endpoints] = opts[:endpoints]
        opts[:endpoints]     = []
      end
      abort "pipe --in requires at least one endpoint"             if opts[:in_endpoints].empty?
      abort "pipe --out requires at least one endpoint"            if opts[:out_endpoints].empty?
      abort "pipe: don't mix --in/--out with bare -b/-c endpoints" unless opts[:endpoints].empty?
    else
      abort "pipe requires exactly 2 endpoints (pull-side and push-side), or use --in/--out" if opts[:endpoints].size != 2
    end
  else
    abort "--in/--out are only valid for pipe" if opts[:in_endpoints].any? || opts[:out_endpoints].any? || opts[:in_compress] || opts[:out_compress]
    abort "At least one --connect or --bind is required" if opts[:connects].empty? && opts[:binds].empty?
  end
  abort "--data and --file are mutually exclusive"        if opts[:data] && opts[:file]
  abort "--subscribe is only valid for SUB"               if !opts[:subscribes].empty? && type_name != "sub"
  abort "--join is only valid for DISH"                   if !opts[:joins].empty? && type_name != "dish"
  abort "--group is only valid for RADIO"                 if opts[:group] && type_name != "radio"
  abort "--identity is only valid for DEALER/ROUTER"      if opts[:identity] && !%w[dealer router].include?(type_name)
  abort "--target is only valid for ROUTER/SERVER/PEER"   if opts[:target] && !%w[router server peer].include?(type_name)
  abort "--conflate is only valid for PUB/RADIO"          if opts[:conflate] && !%w[pub radio].include?(type_name)
  abort "--recv-eval is not valid for send-only sockets (use --send-eval / -E)" if opts[:recv_expr] && SEND_ONLY.include?(type_name)
  abort "--send-eval is not valid for recv-only sockets (use --recv-eval / -e)" if opts[:send_expr] && RECV_ONLY.include?(type_name)
  abort "--send-eval is not valid for REP (the reply is the result of --recv-eval / -e)" if opts[:send_expr] && type_name == "rep"
  abort "--send-eval and --target are mutually exclusive"  if opts[:send_expr] && opts[:target]

  if opts[:parallel]
    parallel_types = %w[pipe pull gather rep]
    abort "-P/--parallel is only valid for #{parallel_types.join(", ")}" unless parallel_types.include?(type_name)
    abort "-P/--parallel must be 1..16" unless (1..16).include?(opts[:parallel])
    if type_name == "pipe"
      all_eps = opts[:in_endpoints] + opts[:out_endpoints] + opts[:endpoints]
    else
      all_eps = opts[:endpoints]
    end
    abort "-P/--parallel requires all endpoints to use --connect (not --bind)" if all_eps.any?(&:bind?)
  end

  (opts[:connects] + opts[:binds]).each do |url|
    abort "inproc not supported, use tcp:// or ipc://" if url.include?("inproc://")
  end

  all_urls = if type_name == "pipe"
               (opts[:in_endpoints] + opts[:out_endpoints] + opts[:endpoints]).map(&:url)
             else
               opts[:connects] + opts[:binds]
             end
  dups = all_urls.tally.select { |_, n| n > 1 }.keys
  abort "duplicate endpoint: #{dups.first}" if dups.any?
end