Class: NNQ::CLI::CliParser
- Inherits:
-
Object
- Object
- NNQ::CLI::CliParser
- Defined in:
- lib/nnq/cli/cli_parser.rb
Overview
Parses and validates command-line arguments for the nnq CLI.
Constant Summary collapse
- EXAMPLES =
<<~'TEXT' -- Request / Reply ------------------------------------------ +-----+ "hello" +-----+ | REQ |------------->| REP | | |<-------------| | +-----+ "HELLO" +-----+ # terminal 1: echo server nnq rep --bind tcp://:5555 --recv-eval '$_.upcase' # terminal 2: send a request echo "hello" | nnq req --connect tcp://localhost:5555 # or over IPC (unix socket, single machine) nnq rep --bind ipc:///tmp/echo.sock --echo & echo "hello" | nnq req --connect ipc:///tmp/echo.sock -- Publish / Subscribe -------------------------------------- +-----+ "weather.nyc 72F" +-----+ | PUB |--------------------->| SUB | --subscribe "weather." +-----+ +-----+ # terminal 1: subscriber (all topics by default) nnq sub --bind tcp://:5556 # terminal 2: publisher (needs --delay for subscription to propagate) echo "weather.nyc 72F" | nnq pub --connect tcp://localhost:5556 --delay 1 -- Periodic Publish ------------------------------------------- +-----+ "tick 1" +-----+ | PUB |--(every 1s)-->| SUB | +-----+ +-----+ # terminal 1: subscriber nnq sub --bind tcp://:5556 # terminal 2: publish a tick every second (wall-clock aligned) nnq pub --connect tcp://localhost:5556 --delay 1 \ --data "tick" --interval 1 # 5 ticks, then exit nnq pub --connect tcp://localhost:5556 --delay 1 \ --data "tick" --interval 1 --count 5 -- Pipeline ------------------------------------------------- +------+ +------+ | PUSH |----------->| PULL | +------+ +------+ # terminal 1: worker nnq pull --bind tcp://:5557 # terminal 2: send tasks echo "task 1" | nnq push --connect tcp://localhost:5557 # or over IPC (unix socket) nnq pull --bind ipc:///tmp/pipeline.sock & echo "task 1" | nnq push --connect ipc:///tmp/pipeline.sock -- Pipe (PULL -> eval -> PUSH) -------------------------------- +------+ +------+ +------+ | PUSH |--------->| pipe |--------->| PULL | +------+ +------+ +------+ # terminal 1: producer echo -e "hello\nworld" | nnq push --bind ipc://@work # terminal 2: worker -- uppercase each message nnq pipe -c ipc://@work -c ipc://@sink -e '$_.upcase' # terminal 3: collector nnq pull --bind ipc://@sink # 4 Ractor workers in a single process (-P) nnq pipe -c ipc://@work -c ipc://@sink -P4 -r./fib -e 'fib(Integer($_)).to_s' # exit when producer disconnects (--transient) nnq pipe -c ipc://@work -c ipc://@sink --transient -e '$_.upcase' # fan-in: multiple sources -> one sink nnq pipe --in -c ipc://@work1 -c ipc://@work2 \ --out -c ipc://@sink -e '$_.upcase' # fan-out: one source -> multiple sinks (round-robin) nnq pipe --in -b tcp://:5555 --out -c ipc://@sink1 -c ipc://@sink2 -e '$_' -- Formats -------------------------------------------------- # ascii (default) -- non-printable replaced with dots nnq pull --bind tcp://:5557 --ascii # quoted -- lossless, round-trippable (uses String#dump escaping) nnq pull --bind tcp://:5557 --quoted # JSON Lines -- single-element arrays (nnq is single-body) echo '["payload"]' | nnq push --connect tcp://localhost:5557 --jsonl nnq pull --bind tcp://:5557 --jsonl # raw -- emit the body verbatim (no framing, no newline) nnq pull --bind tcp://:5557 --raw -- Compression ---------------------------------------------- # both sides must use --compress nnq pull --bind tcp://:5557 --compress & echo "compressible data" | nnq push --connect tcp://localhost:5557 --compress -- Ruby Eval ------------------------------------------------ # filter incoming: only pass messages containing "error" nnq pull -b tcp://:5557 --recv-eval '$_.include?("error") ? $_ : nil' # transform incoming with gems nnq sub -c tcp://localhost:5556 -rjson -e 'JSON.parse($_)["temperature"]' # require a local file, use its methods nnq rep --bind tcp://:5555 --require ./transform.rb -e 'upcase($_)' # next skips, break stops -- regexps match against $_ nnq pull -b tcp://:5557 -e 'next if /^#/; break if /quit/; $_' # BEGIN/END blocks (like awk) -- accumulate and summarize nnq pull -b tcp://:5557 -e 'BEGIN{@sum = 0} @sum += Integer($_); nil END{puts @sum}' # transform outgoing messages echo hello | nnq push -c tcp://localhost:5557 --send-eval '$_.upcase' # REQ: transform request and reply independently echo hello | nnq req -c tcp://localhost:5555 -E '$_.upcase' -e '$_' -- Script Handlers (-r) ------------------------------------ # handler.rb -- register transforms from a file # db = PG.connect("dbname=app") # NNQ.incoming { |msg| db.exec(msg).values.flatten.first } # at_exit { db.close } nnq pull --bind tcp://:5557 -r./handler.rb # combine script handlers with inline eval nnq req -c tcp://localhost:5555 -r./handler.rb -E '$_.upcase' # NNQ.outgoing { |msg| ... } -- registered outgoing transform # NNQ.incoming { |msg| ... } -- registered incoming transform # CLI flags (-e/-E) override registered handlers TEXT
- DEFAULT_OPTS =
{ type_name: nil, endpoints: [], connects: [], binds: [], in_endpoints: [], out_endpoints: [], data: nil, file: nil, format: :ascii, subscribes: [], interval: nil, count: nil, delay: nil, timeout: nil, linger: 5, reconnect_ivl: nil, send_hwm: nil, sndbuf: nil, rcvbuf: nil, compress: false, compress_in: false, compress_out: false, send_expr: nil, recv_expr: nil, parallel: nil, transient: false, verbose: 0, quiet: false, echo: false, scripts: [], recv_maxsz: nil, }.freeze
Class Method Summary collapse
-
.loopback_bind_host ⇒ Object
Returns the loopback address for bind normalization.
-
.parse(argv) ⇒ Object
Parses
argvand returns a mutable options hash. -
.validate!(opts) ⇒ Object
Validates option combinations, aborting on bad combos.
-
.validate_gems!(config) ⇒ Object
Validates option combinations that depend on socket type.
Instance Method Summary collapse
-
#expand_endpoint(url) ⇒ Object
Expands shorthand ‘@name` to `ipc://@name` (Linux abstract namespace).
-
#parse(argv) ⇒ Hash
Parses
argvand returns a mutable options hash. -
#parse_byte_size(str) ⇒ Integer
Parses a byte size string with an optional K/M/G suffix (binary, i.e. 1K = 1024 bytes).
-
#validate!(opts) ⇒ void
Validates option combinations, aborting on invalid combos.
Class Method Details
.loopback_bind_host ⇒ Object
Returns the loopback address for bind normalization. Prefers IPv6 loopback ([::1]) when the host has at least one non-loopback, non-link-local IPv6 address, otherwise 127.0.0.1.
402 403 404 405 406 407 408 409 410 |
# File 'lib/nnq/cli/cli_parser.rb', line 402 def self.loopback_bind_host @loopback_bind_host ||= begin has_ipv6 = ::Socket.getifaddrs.any? { |ifa| addr = ifa.addr addr&.ipv6? && !addr.ipv6_loopback? && !addr.ipv6_linklocal? } has_ipv6 ? "[::1]" : "127.0.0.1" end end |
.parse(argv) ⇒ Object
Parses argv and returns a mutable options hash.
198 199 200 |
# File 'lib/nnq/cli/cli_parser.rb', line 198 def self.parse(argv) new.parse(argv) end |
.validate!(opts) ⇒ Object
Validates option combinations, aborting on bad combos.
205 206 207 |
# File 'lib/nnq/cli/cli_parser.rb', line 205 def self.validate!(opts) new.validate!(opts) end |
.validate_gems!(config) ⇒ Object
Validates option combinations that depend on socket type.
212 213 214 215 216 |
# File 'lib/nnq/cli/cli_parser.rb', line 212 def self.validate_gems!(config) if config.recv_only? && (config.data || config.file) abort "--data/--file not valid for #{config.type_name} (receive-only)" end end |
Instance Method Details
#expand_endpoint(url) ⇒ Object
Expands shorthand ‘@name` to `ipc://@name` (Linux abstract namespace). Only triggers when the value starts with `@` and has no `://` scheme.
480 481 482 |
# File 'lib/nnq/cli/cli_parser.rb', line 480 def (url) url.start_with?("@") && !url.include?("://") ? "ipc://#{url}" : url end |
#parse(argv) ⇒ Hash
Parses argv and returns a mutable options hash.
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 |
# File 'lib/nnq/cli/cli_parser.rb', line 223 def parse(argv) opts = DEFAULT_OPTS.transform_values { |v| v.is_a?(Array) ? v.dup : v } pipe_side = nil # nil = legacy positional mode; :in/:out = modal parser = OptionParser.new do |o| o. = "Usage: nnq TYPE [options]\n\n" \ "Types: req, rep, pub, sub, push, pull, pair\n" \ "Virtual: pipe (PULL -> eval -> PUSH)\n\n" o.separator "Connection:" o.on("-c", "--connect URL", "Connect to endpoint (repeatable)") { |v| v = (v) ep = Endpoint.new(v, false) case pipe_side when :in opts[:in_endpoints] << ep when :out opts[:out_endpoints] << ep else opts[:endpoints] << ep opts[:connects] << v end } o.on("-b", "--bind URL", "Bind to endpoint (repeatable)") { |v| v = (v) ep = Endpoint.new(v, true) case pipe_side when :in opts[:in_endpoints] << ep when :out opts[:out_endpoints] << ep else opts[:endpoints] << ep opts[:binds] << v end } o.on("--in", "Pipe: subsequent -b/-c attach to input (PULL) side") { pipe_side = :in } o.on("--out", "Pipe: subsequent -b/-c attach to output (PUSH) side") { pipe_side = :out } o.separator "\nData source (REP: reply source):" o.on( "--echo", "Echo received messages back (REP)") { opts[:echo] = true } o.on("-D", "--data DATA", "Message data (literal string)") { |v| opts[:data] = v } o.on("-F", "--file FILE", "Read message from file (- = stdin)") { |v| opts[:file] = v } o.separator "\nFormat (input + output):" o.on("-A", "--ascii", "Safe ASCII, non-printable as dots (default)") { opts[:format] = :ascii } o.on("-Q", "--quoted", "C-style quoted with escapes") { opts[:format] = :quoted } o.on( "--raw", "Raw binary body, no framing, no newline") { opts[:format] = :raw } o.on("-J", "--jsonl", "JSON Lines (single-element array per line)") { opts[:format] = :jsonl } o.on( "--msgpack", "MessagePack (binary stream)") { require "msgpack"; opts[:format] = :msgpack } o.on("-M", "--marshal", "Ruby Marshal stream (binary)") { opts[:format] = :marshal } o.separator "\nSubscription:" o.on("-s", "--subscribe PREFIX", "Subscribe prefix (SUB, default all)") { |v| opts[:subscribes] << v } o.separator "\nTiming:" o.on("-i", "--interval SECS", Float, "Repeat interval") { |v| opts[:interval] = v } o.on("-n", "--count COUNT", Integer, "Max iterations (0=inf)") { |v| opts[:count] = v } o.on("-d", "--delay SECS", Float, "Delay before first send") { |v| opts[:delay] = v } o.on("-t", "--timeout SECS", Float, "Send/receive timeout") { |v| opts[:timeout] = v } o.on("-l", "--linger SECS", Float, "Drain time on close (default 5)") { |v| opts[:linger] = v } o.on("--reconnect-ivl IVL", "Reconnect interval: SECS or MIN..MAX (default 0.1)") { |v| opts[:reconnect_ivl] = if v.include?("..") lo, hi = v.split("..", 2) Float(lo)..Float(hi) else Float(v) end } o.on("--recv-maxsz SIZE", "Max inbound message size, e.g. 4096, 64K, 1M, 2G (default 1M, 0=unlimited; larger messages drop the connection)") { |v| opts[:recv_maxsz] = parse_byte_size(v) } o.on("--hwm N", Integer, "Send high water mark (default 100, 0=unbounded)") { |v| opts[:send_hwm] = v } o.on("--sndbuf N", "SO_SNDBUF kernel buffer size (e.g. 4K, 1M)") { |v| opts[:sndbuf] = parse_byte_size(v) } o.on("--rcvbuf N", "SO_RCVBUF kernel buffer size (e.g. 4K, 1M)") { |v| opts[:rcvbuf] = parse_byte_size(v) } o.separator "\nCompression:" o.on("-z", "--compress", "LZ4 compression per message (modal with --in/--out)") do require "rlz4" case pipe_side when :in opts[:compress_in] = true when :out opts[:compress_out] = true else opts[:compress] = true end end o.separator "\nProcessing (-e = incoming, -E = outgoing):" o.on("-e", "--recv-eval EXPR", "Eval Ruby for each incoming message ($_ = body)") { |v| opts[:recv_expr] = v } o.on("-E", "--send-eval EXPR", "Eval Ruby for each outgoing message ($_ = body)") { |v| opts[:send_expr] = v } o.on("-r", "--require LIB", "Require lib/file in Async context; use '-' for stdin. Scripts can register NNQ.outgoing/incoming") { |v| require "nnq" unless defined?(NNQ::VERSION) opts[:scripts] << (v == "-" ? :stdin : (v.start_with?("./", "../") ? File.(v) : v)) } o.on("-P", "--parallel N", Integer, "Parallel Ractor workers (max 16)") { |v| opts[:parallel] = [v, 16].min } o.separator "\nOther:" o.on("-v", "--verbose", "Verbosity: -v endpoints, -vv events, -vvv messages, -vvvv timestamps") { opts[:verbose] += 1 } o.on("-q", "--quiet", "Suppress message output") { opts[:quiet] = true } o.on( "--transient", "Exit when all peers disconnect") { opts[:transient] = true } o.on("-V", "--version") { if ENV["NNQ_DEV"] require_relative "../../../../nnq/lib/nnq/version" else require "nnq/version" end puts "nnq-cli #{NNQ::CLI::VERSION} (nnq #{NNQ::VERSION})" exit } o.on("-h") { puts o exit } o.on("--help") { CLI.page "#{o}\n#{EXAMPLES}" exit } o.on("--examples") { CLI.page EXAMPLES exit } o.separator "\nExit codes: 0 = success, 1 = error, 2 = timeout, 3 = eval error" end begin parser.parse!(argv) rescue OptionParser::ParseError => e abort e. end type_name = argv.shift if type_name.nil? abort parser.to_s if opts[:scripts].empty? # bare script mode -- type_name stays nil elsif !SOCKET_TYPE_NAMES.include?(type_name.downcase) abort "Unknown socket type: #{type_name}. Known: #{SOCKET_TYPE_NAMES.join(', ')}" else opts[:type_name] = type_name.downcase end # Normalize shorthand hostnames to concrete addresses. # # Binds: tcp://:PORT → loopback (::1 if IPv6 available, else 127.0.0.1) # tcp://*:PORT → 0.0.0.0 (all interfaces, IPv4) # # Connects: tcp://:PORT → localhost (Happy Eyeballs) # tcp://*:PORT → localhost loopback = self.class.loopback_bind_host normalize_bind = ->(url) { url.sub(%r{\Atcp://\*:}, "tcp://0.0.0.0:").sub(%r{\Atcp://:}, "tcp://#{loopback}:") } normalize_connect = ->(url) { url.sub(%r{\Atcp://(\*|):}, "tcp://localhost:") } normalize_ep = ->(ep) { Endpoint.new(ep.bind? ? normalize_bind.call(ep.url) : normalize_connect.call(ep.url), ep.bind?) } opts[:binds].map!(&normalize_bind) opts[:connects].map!(&normalize_connect) opts[:endpoints].map!(&normalize_ep) opts[:in_endpoints].map!(&normalize_ep) opts[:out_endpoints].map!(&normalize_ep) opts end |
#parse_byte_size(str) ⇒ Integer
Parses a byte size string with an optional K/M/G suffix (binary, i.e. 1K = 1024 bytes).
387 388 389 390 391 392 393 394 395 396 |
# File 'lib/nnq/cli/cli_parser.rb', line 387 def parse_byte_size(str) case str when /\A(\d+)[kK]\z/ then $1.to_i * 1024 when /\A(\d+)[mM]\z/ then $1.to_i * 1024 * 1024 when /\A(\d+)[gG]\z/ then $1.to_i * 1024 * 1024 * 1024 when /\A\d+\z/ then str.to_i else abort "invalid byte size: #{str} (use e.g. 4096, 4K, 1M, 2G)" end end |
#validate!(opts) ⇒ void
This method returns an undefined value.
Validates option combinations, aborting on invalid combos.
417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 |
# File 'lib/nnq/cli/cli_parser.rb', line 417 def validate!(opts) return if opts[:type_name].nil? # bare script mode abort "-r- (stdin script) and -F- (stdin data) cannot both be used" if opts[:scripts]&.include?(:stdin) && opts[:file] == "-" type_name = opts[:type_name] if type_name == "pipe" has_in_out = opts[:in_endpoints].any? || opts[:out_endpoints].any? if has_in_out # Promote bare endpoints into the missing side: # `pipe -c SRC --out -c DST` → bare SRC becomes --in if opts[:in_endpoints].empty? && opts[:endpoints].any? opts[:in_endpoints] = opts[:endpoints] opts[:endpoints] = [] elsif opts[:out_endpoints].empty? && opts[:endpoints].any? opts[:out_endpoints] = opts[:endpoints] opts[:endpoints] = [] end abort "pipe --in requires at least one endpoint" if opts[:in_endpoints].empty? abort "pipe --out requires at least one endpoint" if opts[:out_endpoints].empty? abort "pipe: don't mix --in/--out with bare -b/-c endpoints" unless opts[:endpoints].empty? else abort "pipe requires exactly 2 endpoints (pull-side and push-side), or use --in/--out" if opts[:endpoints].size != 2 end else abort "--in/--out are only valid for pipe" if opts[:in_endpoints].any? || opts[:out_endpoints].any? abort "At least one --connect or --bind is required" if opts[:connects].empty? && opts[:binds].empty? end abort "--data and --file are mutually exclusive" if opts[:data] && opts[:file] abort "--subscribe is only valid for SUB" if !opts[:subscribes].empty? && type_name != "sub" abort "--recv-eval is not valid for send-only sockets (use --send-eval / -E)" if opts[:recv_expr] && SEND_ONLY.include?(type_name) abort "--send-eval is not valid for recv-only sockets (use --recv-eval / -e)" if opts[:send_expr] && RECV_ONLY.include?(type_name) abort "--send-eval is not valid for REP (the reply is the result of --recv-eval / -e)" if opts[:send_expr] && type_name == "rep" if opts[:parallel] parallel_types = %w[pipe] abort "-P/--parallel is only valid for #{parallel_types.join(", ")}" unless parallel_types.include?(type_name) abort "-P/--parallel must be 1..16" unless (1..16).include?(opts[:parallel]) all_eps = if type_name == "pipe" opts[:in_endpoints] + opts[:out_endpoints] + opts[:endpoints] else opts[:endpoints] end abort "-P/--parallel requires all endpoints to use --connect (not --bind)" if all_eps.any?(&:bind?) end (opts[:connects] + opts[:binds]).each do |url| abort "inproc not supported, use tcp:// or ipc://" if url.include?("inproc://") end all_urls = if type_name == "pipe" (opts[:in_endpoints] + opts[:out_endpoints] + opts[:endpoints]).map(&:url) else opts[:connects] + opts[:binds] end dups = all_urls.tally.select { |_, n| n > 1 }.keys abort "duplicate endpoint: #{dups.first}" if dups.any? end |