Class: OMQ::CLI::CliParser

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/cli/cli_parser.rb

Overview

Parses and validates command-line arguments for the omq CLI.

Constant Summary collapse

EXAMPLES =
<<~'TEXT'
  -- Request / Reply ------------------------------------------

    +-----+   "hello"    +-----+
    | REQ |------------->| REP |
    |     |<-------------|     |
    +-----+   "HELLO"    +-----+

    # terminal 1: echo server
    omq rep --bind tcp://:5555 --recv-eval 'it.map(&:upcase)'

    # terminal 2: send a request
    echo "hello" | omq req --connect tcp://localhost:5555

    # or over IPC (unix socket, single machine)
    omq rep --bind ipc:///tmp/echo.sock --echo &
    echo "hello" | omq req --connect ipc:///tmp/echo.sock

  -- Publish / Subscribe --------------------------------------

    +-----+  "weather.nyc 72F"  +-----+
    | PUB |-------------------->| SUB | --subscribe "weather."
    +-----+                     +-----+

    # terminal 1: subscriber (all topics by default)
    omq sub --bind tcp://:5556

    # terminal 2: publisher (needs --delay for subscription to propagate)
    echo "weather.nyc 72F" | omq pub --connect tcp://localhost:5556 --delay 1

  -- Periodic Publish -------------------------------------------

    +-----+   "tick 1"    +-----+
    | PUB |--(every 1s)-->| SUB |
    +-----+               +-----+

    # terminal 1: subscriber
    omq sub --bind tcp://:5556

    # terminal 2: publish a tick every second (wall-clock aligned)
    omq pub --connect tcp://localhost:5556 --delay 1 --data "tick" --interval 1

    # 5 ticks, then exit
    omq pub --connect tcp://localhost:5556 -d1 -D "tick" -i0.5 --count 5

  -- Pipeline -------------------------------------------------

    +------+           +------+
    | PUSH |---------->| PULL |
    +------+           +------+

    # terminal 1: worker
    omq pull --bind tcp://:5557

    # terminal 2: send tasks
    echo "task 1" | omq push --connect tcp://localhost:5557

    # or over IPC (unix socket)
    omq pull --bind ipc:///tmp/pipeline.sock &
    echo "task 1" | omq push --connect ipc:///tmp/pipeline.sock

  -- Pipe (PULL -> eval -> PUSH) --------------------------------

    +------+         +------+         +------+
    | PUSH |-------->| pipe |-------->| PULL |
    +------+         +------+         +------+

    # @work / @sink below are Linux abstract-namespace unix
    # sockets (ipc://@name) -- no path on disk, cleaned up
    # automatically. Linux only. Use ipc:///tmp/work etc.
    # on macOS/BSD.

    # terminal 1: producer
    echo -e "hello\nworld" | omq push -b@work

    # terminal 2: worker (uppercase each message)
    omq pipe -c@work -c@sink -e 'it.map(&:upcase)'
    # terminal 3: collector
    omq pull -b@sink

    # 4 Ractor workers in a single process (-P)
    omq pipe -c@work -c@sink -P4 -r./fib -e 'fib(it.first.to_i).to_s'

    # exit when producer disconnects (--transient)
    omq pipe -c@work -c@sink --transient -e 'it.map(&:upcase)'

    # fan-in (multiple sources -> one sink)
    omq pipe --in -c@work1 -c@work2 --out -c@sink -e 'it.map(&:upcase)'

    # fan-out (one source -> multiple sinks, round-robin)
    omq pipe --in -b tcp://:5555 --out -c@sink1 -c@sink2 -e 'it'

  -- CLIENT / SERVER (draft) ----------------------------------

    +--------+   "hello"   +--------+
    | CLIENT |------------>| SERVER | --recv-eval 'it.map(&:upcase)'
    |        |<------------|        |
    +--------+   "HELLO"   +--------+

    # terminal 1: upcasing server
    omq server --bind tcp://:5555 --recv-eval 'it.map(&:upcase)'

    # terminal 2: client
    echo "hello" | omq client --connect tcp://localhost:5555

  -- Formats --------------------------------------------------

    # ascii (default) -- non-printable replaced with dots
    omq pull --bind tcp://:5557 --ascii

    # quoted -- lossless, round-trippable (uses String#dump escaping)
    omq pull --bind tcp://:5557 --quoted

    # JSON Lines -- structured, multipart as arrays
    echo '["key","value"]' | omq push --connect tcp://localhost:5557 --jsonl
    omq pull --bind tcp://:5557 --jsonl

    # multipart via tabs
    printf "routing-key\tpayload" | omq push --connect tcp://localhost:5557

  -- Marshal (arbitrary Ruby objects) -------------------------

    # -M: each message is one Marshal-dumped Ruby object.
    # Inside -e/-E, `it` is the raw object (not an Array).

    # send a bare string; receiver transforms it to { string => encoding } hash
    omq push -b tcp://:5557 -ME '"foo" * 3'
    omq pull -c tcp://:5557 -Me '{it => it.encoding}'
    # output: {"foofoofoo" => #<Encoding:UTF-8>}

    # -vvv traces render the app object, not wire bytes
    omq push -b tcp://:5557 -ME '{now: Time.now, pid: Process.pid}' -vvv
    # >> (marshal) {now: 2026-04-13 ..., pid: 12345}

  -- Compression ----------------------------------------------

    # ZMTP-Zstd is negotiated during the handshake.
    # Recv sockets advertise it passively by default.
    # Use -z on the sender to compress outgoing frames.
    omq pull --bind tcp://:5557 &
    echo "compressible data" | omq push --connect tcp://localhost:5557 -z

  -- CURVE Encryption -----------------------------------------

    # server (prints OMQ_SERVER_KEY=...)
    omq rep --bind tcp://:5555 --echo --curve-server

    # client (paste the server's key)
    echo "secret" | omq req --connect tcp://localhost:5555 \
      --curve-server-key '<key from server>'

  -- ROUTER / DEALER ------------------------------------------

    +--------+          +--------+
    | DEALER |--------->| ROUTER |
    | id=w1  |          |        |
    +--------+          +--------+

    # terminal 1: router shows identity + message
    omq router --bind tcp://:5555

    # terminal 2: dealer with identity
    echo "hello" | omq dealer --connect tcp://localhost:5555 --identity worker-1

  -- Ruby Eval ------------------------------------------------

    # filter incoming: only pass messages containing "error"
    omq pull -b tcp://:5557 --recv-eval 'it.first.include?("error") ? it : nil'

    # transform incoming with gems
    omq sub -c tcp://localhost:5556 -rjson -e 'JSON.parse(it.first)["temperature"]'

    # require a local file, use its methods
    omq rep --bind tcp://:5555 --require ./transform.rb -e 'upcase_all(it)'

    # next skips, break stops
    omq pull -b tcp://:5557 -e 'next if it.first =~ /^#/; break if it.first =~ /quit/; it'

    # BEGIN/END blocks (like awk) -- accumulate and summarize
    omq pull -b tcp://:5557 -e 'BEGIN{@sum = 0} @sum += it.first.to_i; nil END{puts @sum}'

    # transform outgoing messages
    echo hello | omq push -c tcp://localhost:5557 --send-eval 'it.map(&:upcase)'

    # REQ: transform request and reply independently
    echo hello | omq req -c tcp://localhost:5555 -E 'it.map(&:upcase)' -e 'it.first'

    # block parameter: single param receives parts array
    omq pull -b tcp://:5557 -e '|msg| msg.map(&:upcase)'

    # destructure multipart messages with parens
    omq pull -b tcp://:5557 -e '|(key, value)| "#{key}=#{value}"'

  -- Script Handlers (-r) ------------------------------------

    # handler.rb -- register transforms from a file
    #   db = PG.connect("dbname=app")
    #   OMQ.incoming { |first_part, _| db.exec(first_part).values.flatten }
    #   at_exit { db.close }
    omq pull --bind tcp://:5557 -r./handler.rb

    # combine script handlers with inline eval
    omq req -c tcp://localhost:5555 -r./handler.rb -E 'it.map(&:upcase)'

    # OMQ.outgoing { |msg| ... }   -- registered outgoing transform
    # OMQ.incoming { |msg| ... }   -- registered incoming transform
    # CLI flags (-e/-E) override registered handlers
TEXT
DEFAULT_OPTS =
{
  type_name:        nil,
  endpoints:        [],
  connects:         [],
  binds:            [],
  in_endpoints:     [],
  out_endpoints:    [],
  data:             nil,
  file:             nil,
  format:           :ascii,
  subscribes:       [],
  joins:            [],
  group:            nil,
  identity:         nil,
  target:           nil,
  interval:         nil,
  count:            nil,
  delay:            nil,
  timeout:          nil,
  linger:           5,
  reconnect_ivl:    nil,
  heartbeat_ivl:    nil,
  send_hwm:         nil,
  recv_hwm:         nil,
  sndbuf:           nil,
  rcvbuf:           nil,
  conflate:         false,
  compress:         false,
  compress_level:   nil,
  send_expr:        nil,
  recv_expr:        nil,
  parallel:         nil,
  transient:        false,
  verbose:          0,
  timestamps:       nil,
  quiet:            false,
  echo:             false,
  scripts:          [],
  recv_maxsz:       nil,
  curve_server:     false,
  curve_server_key: nil,
  crypto:     nil,
  ffi:              false,
}.freeze

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.parse(argv) ⇒ Object

Parses argv and returns a mutable options hash.



269
270
271
# File 'lib/omq/cli/cli_parser.rb', line 269

def self.parse(argv)
  new.parse(argv)
end

.validate!(opts) ⇒ Object

Validates option combinations, aborting on bad combos.



295
296
297
# File 'lib/omq/cli/cli_parser.rb', line 295

def self.validate!(opts)
  new.validate!(opts)
end

.validate_gems!(config) ⇒ Object

Validates option combinations that depend on socket type.



302
303
304
305
306
# File 'lib/omq/cli/cli_parser.rb', line 302

def self.validate_gems!(config)
  if config.recv_only? && (config.data || config.file)
    abort "--data/--file not valid for #{config.type_name} (receive-only)"
  end
end

Instance Method Details

#expand_endpoint(url) ⇒ Object

Expands shorthand ‘@name` to `ipc://@name` (Linux abstract namespace). Only triggers when the value starts with `@` and has no `://` scheme.



594
595
596
# File 'lib/omq/cli/cli_parser.rb', line 594

def expand_endpoint(url)
  url.start_with?("@") && !url.include?("://") ? "ipc://#{url}" : url
end

#parse(argv) ⇒ Hash

Parses argv and returns a mutable options hash.

Parameters:

  • argv (Array<String>)

    command-line arguments (mutated in place)

Returns:

  • (Hash)

    parsed options



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
# File 'lib/omq/cli/cli_parser.rb', line 313

def parse(argv)
  opts      = DEFAULT_OPTS.transform_values { |v| v.is_a?(Array) ? v.dup : v }
  pipe_side = nil  # nil = legacy positional mode; :in/:out = modal

  parser = OptionParser.new do |o|
    o.banner = "Usage: omq TYPE [options]\n\n" \
               "Types:    req, rep, pub, sub, push, pull, pair, dealer, router\n" \
               "Draft:    client, server, radio, dish, scatter, gather, channel, peer\n" \
               "Virtual:  pipe (PULL -> eval -> PUSH)\n\n"

    o.separator "Connection:"
    o.on("-c", "--connect URL", "Connect to endpoint (repeatable)") { |v|
      v = expand_endpoint(v)
      ep = Endpoint.new(v, false)
      case pipe_side
      when :in
        opts[:in_endpoints] << ep
      when :out
        opts[:out_endpoints] << ep
      else
        opts[:endpoints] << ep
        opts[:connects]  << v
      end
    }
    o.on("-b", "--bind URL", "Bind to endpoint (repeatable)") { |v|
      v = expand_endpoint(v)
      ep = Endpoint.new(v, true)
      case pipe_side
      when :in
        opts[:in_endpoints] << ep
      when :out
        opts[:out_endpoints] << ep
      else
        opts[:endpoints] << ep
        opts[:binds]     << v
      end
    }
    o.on("--in",  "Pipe: subsequent -b/-c attach to input (PULL) side")  { pipe_side = :in }
    o.on("--out", "Pipe: subsequent -b/-c attach to output (PUSH) side") { pipe_side = :out }

    o.separator "\nData source (REP: reply source):"
    o.on(      "--echo",        "Echo received messages back (REP)")   { opts[:echo] = true }
    o.on("-D", "--data DATA",   "Message data (literal string)")      { |v| opts[:data] = v }
    o.on("-F", "--file FILE",   "Read message from file (- = stdin)") { |v| opts[:file] = v }

    o.separator "\nFormat (input + output):"
    o.on("-A", "--ascii",   "Tab-separated frames, safe ASCII (default)") { opts[:format] = :ascii }
    o.on("-Q", "--quoted",  "C-style quoted with escapes")                { opts[:format] = :quoted }
    o.on(      "--raw",     "Raw binary, no framing")                     { opts[:format] = :raw }
    o.on("-J", "--jsonl",   "JSON Lines (array of strings per line)")     { opts[:format] = :jsonl }
    o.on(      "--msgpack",  "MessagePack arrays (binary stream)")         { require "msgpack"; opts[:format] = :msgpack }
    o.on("-M", "--marshal", "Ruby Marshal stream (one arbitrary object per message)") { opts[:format] = :marshal }

    o.separator "\nSubscription/groups:"
    o.on("-s", "--subscribe PREFIX", "Subscribe prefix (SUB, default all)")     { |v| opts[:subscribes] << v }
    o.on("-j", "--join GROUP",       "Join group (repeatable, DISH only)")      { |v| opts[:joins] << v }
    o.on("-g", "--group GROUP",      "Publish group (RADIO only)")              { |v| opts[:group] = v }

    o.separator "\nIdentity/routing:"
    o.on("--identity ID", "Set socket identity (DEALER/ROUTER)")                     { |v| opts[:identity] = v }
    o.on("--target ID",   "Target peer (ROUTER/SERVER/PEER, 0x prefix for binary)")  { |v| opts[:target] = v }

    o.separator "\nTiming:"
    o.on("-i", "--interval SECS", Float,   "Repeat interval")                   { |v| opts[:interval] = v }
    o.on("-n", "--count COUNT",   Integer,  "Max iterations (0=inf)")            { |v| opts[:count] = v }
    o.on("-d", "--delay SECS",    Float,   "Delay before first send")            { |v| opts[:delay] = v }
    o.on("-t", "--timeout SECS",  Float,   "Send/receive timeout")               { |v| opts[:timeout] = v }
    o.on("-l", "--linger SECS",   Float,   "Drain time on close (default 5)")   { |v| opts[:linger] = v }
    o.on("--reconnect-ivl IVL", "Reconnect interval: SECS or MIN..MAX (default 0.1)") { |v|
      opts[:reconnect_ivl] = if v.include?("..")
                               lo, hi = v.split("..", 2)
                               Float(lo)..Float(hi)
                             else
                               Float(v)
                             end
    }
    o.on("--heartbeat-ivl SECS", Float, "ZMTP heartbeat interval (detects dead peers)") { |v| opts[:heartbeat_ivl] = v }
    o.on("--recv-maxsz SIZE", "Max inbound message size, e.g. 4096, 64K, 1M, 2G (default 1M, 0=unlimited; larger messages drop the connection)") { |v| opts[:recv_maxsz] = parse_byte_size(v) }
    o.on("--hwm N", Integer, "High water mark (default 64, 0=unbounded; modal with --in/--out)") do |v|
      case pipe_side
      when :in
        opts[:recv_hwm] = v
      when :out
        opts[:send_hwm] = v
      else
        opts[:send_hwm] = v
        opts[:recv_hwm] = v
      end
    end
    o.on("--sndbuf N", "SO_SNDBUF kernel buffer size (e.g. 4K, 1M)") { |v| opts[:sndbuf] = parse_byte_size(v) }
    o.on("--rcvbuf N", "SO_RCVBUF kernel buffer size (e.g. 4K, 1M)") { |v| opts[:rcvbuf] = parse_byte_size(v) }

    o.separator "\nDelivery:"
    o.on("--conflate", "Keep only last message per subscriber (PUB/RADIO)") { opts[:conflate] = true }

    o.separator "\nCompression:"
    o.on("-z", "Zstd compression (level -3, fast)") do
      opts[:compress] = true
      opts[:compress_level] = -3
    end
    o.on("-Z", "Zstd compression (level 3, better ratio)") do
      opts[:compress] = true
      opts[:compress_level] = 3
    end
    o.on("--compress=LEVEL", Integer, "Zstd compression with custom level (e.g. 19, -1)") do |v|
      opts[:compress] = true
      opts[:compress_level] = v
    end

    o.separator "\nProcessing (-e = incoming, -E = outgoing):"
    o.on("-e", "--recv-eval EXPR", "Eval Ruby for each incoming message (it = parts, or |a, b|)") { |v| opts[:recv_expr] = v }
    o.on("-E", "--send-eval EXPR", "Eval Ruby for each outgoing message (it = parts, or |a, b|)") { |v| opts[:send_expr] = v }
    o.on("-r", "--require LIB",  "Require lib/file in Async context; use '-' for stdin. Scripts can register OMQ.outgoing/incoming") { |v|
      require "omq" unless defined?(OMQ::VERSION)
      opts[:scripts] << (v == "-" ? :stdin : (v.start_with?("./", "../") ? File.expand_path(v) : v))
    }
    o.on("-P", "--parallel [N]", Integer, "Parallel Ractor workers (0 = nproc, max 16)") { |v|
      n = v.nil? || v.zero? ? Etc.nprocessors : v
      opts[:parallel] = [n, 16].min
    }

    o.separator "\nCURVE encryption (requires system libsodium):"
    o.on("--curve-server",         "Enable CURVE as server (generates keypair)") { opts[:curve_server] = true }
    o.on("--curve-server-key KEY", "Enable CURVE as client (server's Z85 public key)") { |v| opts[:curve_server_key] = v }
    o.on("--crypto BACKEND", "Crypto backend: rbnacl (default) or nuckle (pure Ruby, DANGEROUS)") { |v| opts[:crypto] = v }
    o.separator "  Install libsodium: apt install libsodium-dev / brew install libsodium"
    o.separator "  Env vars: OMQ_SERVER_KEY (client), OMQ_SERVER_PUBLIC + OMQ_SERVER_SECRET (server)"
    o.separator "            OMQ_CRYPTO (backend: rbnacl or nuckle)"

    o.separator "\nOther:"
    o.on("-v", "--verbose",   "Verbosity: -v endpoints, -vv events, -vvv messages") { opts[:verbose] += 1 }
    o.on(      "--timestamps PRECISION", %w[s ms us], "Prefix log lines with UTC timestamp (s/ms/us, default ms)") { |v|
      opts[:timestamps] = v.to_sym
    }
    o.on("-q", "--quiet",     "Suppress message output")           { opts[:quiet] = true }
    o.on(      "--transient", "Exit when all peers disconnect")    { opts[:transient] = true }
    o.on(      "--ffi",       "Use libzmq FFI backend (requires omq-ffi gem + system libzmq 4.x)") do
      begin
        require "omq/ffi"
      rescue LoadError => e
        abort "omq: --ffi requires the omq-ffi gem and system libzmq 4.x (#{e.message})"
      end
      opts[:ffi] = true
    end
    o.on("-V", "--version") {
      if ENV["OMQ_DEV"]
        require_relative "../../../../omq/lib/omq/version"
      else
        require "omq/version"
      end
      puts "omq-cli #{OMQ::CLI::VERSION} (omq #{OMQ::VERSION})"
      exit
    }
    o.on("-h")             { puts o
                             exit }
    o.on("--help")        { CLI.page "#{o}\n#{EXAMPLES}"
                             exit }
    o.on("--examples")    { CLI.page EXAMPLES
                             exit }

    o.separator "\nExit codes: 0 = success, 1 = error, 2 = timeout"
  end

  argv = split_parallel_cluster(argv)

  begin
    parser.parse!(argv)
  rescue OptionParser::ParseError => e
    abort e.message
  end

  type_name = argv.shift
  if type_name.nil?
    abort parser.to_s if opts[:scripts].empty?
    # bare script mode -- type_name stays nil
  elsif !SOCKET_TYPE_NAMES.include?(type_name.downcase)
    abort "Unknown socket type: #{type_name}. Known: #{SOCKET_TYPE_NAMES.join(', ')}"
  else
    opts[:type_name] = type_name.downcase
  end

  # Host shorthand (tcp://*:PORT, tcp://:PORT, tcp://localhost:PORT)
  # is normalized inside OMQ::Transport::TCP — see its
  # #normalize_bind_host / #normalize_connect_host / #loopback_host.

  opts
end

#parse_byte_size(str) ⇒ Integer

Parses a byte size string with an optional K/M/G suffix (binary, i.e. 1K = 1024 bytes).

Parameters:

  • str (String)

    e.g. “4096”, “4K”, “1M”, “2G”

Returns:

  • (Integer)

    size in bytes



508
509
510
511
512
513
514
515
516
517
# File 'lib/omq/cli/cli_parser.rb', line 508

def parse_byte_size(str)
  case str
  when /\A(\d+)[kK]\z/ then $1.to_i * 1024
  when /\A(\d+)[mM]\z/ then $1.to_i * 1024 * 1024
  when /\A(\d+)[gG]\z/ then $1.to_i * 1024 * 1024 * 1024
  when /\A\d+\z/       then str.to_i
  else
    abort "invalid byte size: #{str} (use e.g. 4096, 4K, 1M, 2G)"
  end
end

#split_parallel_cluster(argv) ⇒ Object

Splits short-option clusters of the form ‘-P[letters]` so OptionParser sees `-P` followed by `-[letters]`. Lets `-P0zvv` mean `-P0 -z -v -v` (portable & combinable). Also rewrites bare `–timestamps` to `–timestamps=ms` so OptionParser doesn’t consume the next positional token as its argument.



281
282
283
284
285
286
287
288
289
290
# File 'lib/omq/cli/cli_parser.rb', line 281

def split_parallel_cluster(argv)
  argv.flat_map { |a|
    if a =~ /\A-P(\d*)([a-zA-Z].*)\z/
      n, rest = $1, $2
      n.empty? ? ["-P", "-#{rest}"] : ["-P#{n}", "-#{rest}"]
    else
      a
    end
  }.map { |a| a == "--timestamps" ? "--timestamps=ms" : a }
end

#validate!(opts) ⇒ void

This method returns an undefined value.

Validates option combinations, aborting on invalid combos.

Parameters:

  • opts (Hash)

    parsed options from #parse



524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
# File 'lib/omq/cli/cli_parser.rb', line 524

def validate!(opts)
  return if opts[:type_name].nil?  # bare script mode

  abort "-r- (stdin script) and -F- (stdin data) cannot both be used" if opts[:scripts]&.include?(:stdin) && opts[:file] == "-"

  type_name = opts[:type_name]

  if type_name == "pipe"
    has_in_out = opts[:in_endpoints].any? || opts[:out_endpoints].any?
    if has_in_out
      # Promote bare endpoints into the missing side:
      # `pipe -c SRC --out -c DST` → bare SRC becomes --in
      if opts[:in_endpoints].empty? && opts[:endpoints].any?
        opts[:in_endpoints] = opts[:endpoints]
        opts[:endpoints]    = []
      elsif opts[:out_endpoints].empty? && opts[:endpoints].any?
        opts[:out_endpoints] = opts[:endpoints]
        opts[:endpoints]     = []
      end
      abort "pipe --in requires at least one endpoint"             if opts[:in_endpoints].empty?
      abort "pipe --out requires at least one endpoint"            if opts[:out_endpoints].empty?
      abort "pipe: don't mix --in/--out with bare -b/-c endpoints" unless opts[:endpoints].empty?
    else
      abort "pipe requires exactly 2 endpoints (pull-side and push-side), or use --in/--out" if opts[:endpoints].size != 2
    end
  else
    abort "--in/--out are only valid for pipe" if opts[:in_endpoints].any? || opts[:out_endpoints].any?
    abort "At least one --connect or --bind is required" if opts[:connects].empty? && opts[:binds].empty?
  end
  abort "--data and --file are mutually exclusive"        if opts[:data] && opts[:file]
  abort "--subscribe is only valid for SUB"               if !opts[:subscribes].empty? && type_name != "sub"
  abort "--join is only valid for DISH"                   if !opts[:joins].empty? && type_name != "dish"
  abort "--group is only valid for RADIO"                 if opts[:group] && type_name != "radio"
  abort "--identity is only valid for DEALER/ROUTER"      if opts[:identity] && !%w[dealer router].include?(type_name)
  abort "--target is only valid for ROUTER/SERVER/PEER"   if opts[:target] && !%w[router server peer].include?(type_name)
  abort "--conflate is only valid for PUB/RADIO"          if opts[:conflate] && !%w[pub radio].include?(type_name)
  abort "--recv-eval is not valid for send-only sockets (use --send-eval / -E)" if opts[:recv_expr] && SEND_ONLY.include?(type_name)
  abort "--send-eval is not valid for recv-only sockets (use --recv-eval / -e)" if opts[:send_expr] && RECV_ONLY.include?(type_name)
  abort "--send-eval is not valid for REP (the reply is the result of --recv-eval / -e)" if opts[:send_expr] && type_name == "rep"
  abort "--send-eval and --target are mutually exclusive"  if opts[:send_expr] && opts[:target]

  if opts[:parallel]
    parallel_types = %w[pipe pull gather rep]
    abort "-P/--parallel is only valid for #{parallel_types.join(", ")}" unless parallel_types.include?(type_name)
    abort "-P/--parallel must be 1..16" unless (1..16).include?(opts[:parallel])
    if type_name == "pipe"
      all_eps = opts[:in_endpoints] + opts[:out_endpoints] + opts[:endpoints]
    else
      all_eps = opts[:endpoints]
    end
    abort "-P/--parallel requires all endpoints to use --connect (not --bind)" if all_eps.any?(&:bind?)
  end

  (opts[:connects] + opts[:binds]).each do |url|
    abort "inproc not supported, use tcp:// or ipc://" if url.include?("inproc://")
  end

  all_urls = if type_name == "pipe"
               (opts[:in_endpoints] + opts[:out_endpoints] + opts[:endpoints]).map(&:url)
             else
               opts[:connects] + opts[:binds]
             end
  dups = all_urls.tally.select { |_, n| n > 1 }.keys
  abort "duplicate endpoint: #{dups.first}" if dups.any?
end