Module: NNQ::CLI
- Defined in:
- lib/nnq/cli.rb,
lib/nnq/cli/bus.rb,
lib/nnq/cli/pair.rb,
lib/nnq/cli/pipe.rb,
lib/nnq/cli/term.rb,
lib/nnq/cli/config.rb,
lib/nnq/cli/pub_sub.rb,
lib/nnq/cli/req_rep.rb,
lib/nnq/cli/version.rb,
lib/nnq/cli/formatter.rb,
lib/nnq/cli/push_pull.rb,
lib/nnq/cli/cli_parser.rb,
lib/nnq/cli/base_runner.rb,
lib/nnq/cli/pipe_worker.rb,
lib/nnq/cli/socket_setup.rb,
lib/nnq/cli/ractor_helpers.rb,
lib/nnq/cli/transient_monitor.rb,
lib/nnq/cli/surveyor_respondent.rb,
lib/nnq/cli/expression_evaluator.rb
Overview
Command-line interface for NNQ socket operations.
Defined Under Namespace
Modules: RactorHelpers, SocketSetup, Term Classes: BaseRunner, BusRunner, CliParser, Config, DecompressError, Endpoint, ExpressionEvaluator, Formatter, PairRunner, PipeRunner, PipeWorker, PubRunner, PullRunner, PushRunner, RepRunner, ReqRunner, RespondentRunner, SubRunner, SurveyorRunner, TransientMonitor
Constant Summary collapse
- SOCKET_TYPE_NAMES =
%w[req rep pub sub push pull pair bus surveyor respondent pipe].freeze
- RUNNER_MAP =
{ "push" => [PushRunner, :PUSH0], "pull" => [PullRunner, :PULL0], "pub" => [PubRunner, :PUB0], "sub" => [SubRunner, :SUB0], "req" => [ReqRunner, :REQ0], "rep" => [RepRunner, :REP0], "pair" => [PairRunner, :PAIR0], "bus" => [BusRunner, :BUS0], "surveyor" => [SurveyorRunner, :SURVEYOR0], "respondent" => [RespondentRunner, :RESPONDENT0], "pipe" => [PipeRunner, nil], }.freeze
- SEND_ONLY =
Socket type names that only send messages.
%w[pub push].freeze
- RECV_ONLY =
Socket type names that only receive messages.
%w[sub pull].freeze
- VERSION =
"0.2.0"
Class Method Summary collapse
-
.build_config(argv) ⇒ Object
Builds a frozen Config from command-line arguments.
-
.page(text) ⇒ Object
Displays text through the system pager, or prints directly when stdout is not a terminal.
-
.run(argv = ARGV) ⇒ void
Main entry point.
-
.run_socket(argv) ⇒ Object
Parses CLI arguments, validates options, and runs the main event loop inside an Async reactor.
Class Method Details
.build_config(argv) ⇒ Object
Builds a frozen Config from command-line arguments.
181 182 183 184 185 186 187 188 |
# File 'lib/nnq/cli.rb', line 181 def build_config(argv) opts = CliParser.parse(argv) CliParser.validate!(opts) opts[:stdin_is_tty] = $stdin.tty? Ractor.make_shareable(Config.new(**opts)) end |
.page(text) ⇒ Object
Displays text through the system pager, or prints directly when stdout is not a terminal.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/nnq/cli.rb', line 71 def page(text) if $stdout.tty? if ENV["PAGER"] pager = ENV["PAGER"] else ENV["LESS"] ||= "-FR" pager = "less" end IO.popen(pager, "w") { |io| io.puts text } else puts text end rescue Errno::ENOENT puts text rescue Errno::EPIPE # user quit pager early end |
.run(argv = ARGV) ⇒ void
This method returns an undefined value.
Main entry point.
94 95 96 |
# File 'lib/nnq/cli.rb', line 94 def run(argv = ARGV) run_socket(argv) end |
.run_socket(argv) ⇒ Object
Parses CLI arguments, validates options, and runs the main event loop inside an Async reactor.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/nnq/cli.rb', line 102 def run_socket(argv) config = build_config(argv) require "nnq" require "async" require "json" require "console" CliParser.validate_gems!(config) trap("INT") { Process.exit!(0) } trap("TERM") { Process.exit!(0) } Console.logger = Console::Logger.new(Console::Output::Null.new) unless config.verbose >= 1 debug_ep = nil if ENV["NNQ_DEBUG_URI"] begin require "async/debug" debug_ep = Async::HTTP::Endpoint.parse(ENV["NNQ_DEBUG_URI"]) if debug_ep.scheme == "https" require "localhost" debug_ep = Async::HTTP::Endpoint.parse(ENV["NNQ_DEBUG_URI"], ssl_context: Localhost::Authority.fetch.server_context) end rescue LoadError abort "NNQ_DEBUG_URI requires the async-debug gem: gem install async-debug" end end if config.type_name.nil? Process.setproctitle("nnq script") Object.include(NNQ) unless Object.include?(NNQ) Async annotation: 'nnq' do Async::Debug.serve(endpoint: debug_ep) if debug_ep config.scripts.each { |s| load_script(s) } rescue => e $stderr.puts "nnq: #{e.}" exit 1 end return end runner_class, socket_sym = RUNNER_MAP.fetch(config.type_name) Async annotation: "nnq #{config.type_name}" do |task| Async::Debug.serve(endpoint: debug_ep) if debug_ep config.scripts.each { |s| load_script(s) } runner = if socket_sym runner_class.new(config, NNQ.const_get(socket_sym)) else runner_class.new(config) end runner.call(task) rescue DecompressError => e $stderr.puts "nnq: #{e.}" exit 1 rescue IO::TimeoutError, Async::TimeoutError $stderr.puts "nnq: timeout" unless config.quiet exit 2 rescue ::Socket::ResolutionError => e $stderr.puts "nnq: #{e.}" exit 1 end end |