Module: NNQ::CLI
- Defined in:
- lib/nnq/cli.rb,
lib/nnq/cli/bus.rb,
lib/nnq/cli/pair.rb,
lib/nnq/cli/pipe.rb,
lib/nnq/cli/term.rb,
lib/nnq/cli/config.rb,
lib/nnq/cli/pub_sub.rb,
lib/nnq/cli/req_rep.rb,
lib/nnq/cli/version.rb,
lib/nnq/cli/formatter.rb,
lib/nnq/cli/push_pull.rb,
lib/nnq/cli/cli_parser.rb,
lib/nnq/cli/base_runner.rb,
lib/nnq/cli/pipe_worker.rb,
lib/nnq/cli/socket_setup.rb,
lib/nnq/cli/ractor_helpers.rb,
lib/nnq/cli/transient_monitor.rb,
lib/nnq/cli/surveyor_respondent.rb,
lib/nnq/cli/expression_evaluator.rb
Overview
Command-line interface for NNQ socket operations.
Defined Under Namespace
Modules: RactorHelpers, SocketSetup, Term Classes: BaseRunner, BusRunner, CliParser, Config, Endpoint, ExpressionEvaluator, Formatter, PairRunner, PipeRunner, PipeWorker, PubRunner, PullRunner, PushRunner, RepRunner, ReqRunner, RespondentRunner, SubRunner, SurveyorRunner, TransientMonitor
Constant Summary collapse
- SOCKET_TYPE_NAMES =
%w[req rep pub sub push pull pair bus surveyor respondent pipe].freeze
- RUNNER_MAP =
{ "push" => [PushRunner, :PUSH0], "pull" => [PullRunner, :PULL0], "pub" => [PubRunner, :PUB0], "sub" => [SubRunner, :SUB0], "req" => [ReqRunner, :REQ0], "rep" => [RepRunner, :REP0], "pair" => [PairRunner, :PAIR0], "bus" => [BusRunner, :BUS0], "surveyor" => [SurveyorRunner, :SURVEYOR0], "respondent" => [RespondentRunner, :RESPONDENT0], "pipe" => [PipeRunner, nil], }.freeze
- SEND_ONLY =
Socket type names that only send messages.
%w[pub push].freeze
- RECV_ONLY =
Socket type names that only receive messages.
%w[sub pull].freeze
- VERSION =
"0.3.0"
Class Method Summary collapse
-
.build_config(argv) ⇒ Object
Builds a frozen Config from command-line arguments.
-
.page(text) ⇒ Object
Displays text through the system pager, or prints directly when stdout is not a terminal.
-
.run(argv = ARGV) ⇒ void
Main entry point.
-
.run_socket(argv) ⇒ Object
Parses CLI arguments, validates options, and runs the main event loop inside an Async reactor.
Class Method Details
.build_config(argv) ⇒ Object
Builds a frozen Config from command-line arguments.
192 193 194 195 196 197 198 199 |
# File 'lib/nnq/cli.rb', line 192 def build_config(argv) opts = CliParser.parse(argv) CliParser.validate!(opts) opts[:stdin_is_tty] = $stdin.tty? Ractor.make_shareable(Config.new(**opts)) end |
.page(text) ⇒ Object
Displays text through the system pager, or prints directly when stdout is not a terminal.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/nnq/cli.rb', line 82 def page(text) if $stdout.tty? if ENV["PAGER"] pager = ENV["PAGER"] else ENV["LESS"] ||= "-FR" pager = "less" end IO.popen(pager, "w") { |io| io.puts text } else puts text end rescue Errno::ENOENT puts text rescue Errno::EPIPE # user quit pager early end |
.run(argv = ARGV) ⇒ void
This method returns an undefined value.
Main entry point.
105 106 107 |
# File 'lib/nnq/cli.rb', line 105 def run(argv = ARGV) run_socket(argv) end |
.run_socket(argv) ⇒ Object
Parses CLI arguments, validates options, and runs the main event loop inside an Async reactor.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/nnq/cli.rb', line 113 def run_socket(argv) config = build_config(argv) require "nnq" require "async" require "json" require "console" CliParser.validate_gems!(config) trap("INT") { Process.exit!(0) } trap("TERM") { Process.exit!(0) } Console.logger = Console::Logger.new(Console::Output::Null.new) unless config.verbose >= 1 debug_ep = nil if ENV["NNQ_DEBUG_URI"] begin require "async/debug" debug_ep = Async::HTTP::Endpoint.parse(ENV["NNQ_DEBUG_URI"]) if debug_ep.scheme == "https" require "localhost" debug_ep = Async::HTTP::Endpoint.parse(ENV["NNQ_DEBUG_URI"], ssl_context: Localhost::Authority.fetch.server_context) end rescue LoadError abort "NNQ_DEBUG_URI requires the async-debug gem: gem install async-debug" end end if config.type_name.nil? Process.setproctitle("nnq script") Object.include(NNQ) unless Object.include?(NNQ) Async annotation: 'nnq' do Async::Debug.serve(endpoint: debug_ep) if debug_ep config.scripts.each { |s| load_script(s) } rescue => e $stderr.puts "nnq: #{e.}" exit 1 end return end runner_class, socket_sym = RUNNER_MAP.fetch(config.type_name) Async annotation: "nnq #{config.type_name}" do |task| Async::Debug.serve(endpoint: debug_ep) if debug_ep config.scripts.each { |s| load_script(s) } runner = if socket_sym runner_class.new(config, NNQ.const_get(socket_sym)) else runner_class.new(config) end runner.call(task) rescue NNQ::Zstd::ProtocolError => e $stderr.puts "nnq: zstd protocol error: #{e.}" exit 1 rescue IO::TimeoutError, Async::TimeoutError $stderr.puts "nnq: timeout" unless config.quiet exit 2 rescue ::Socket::ResolutionError => e $stderr.puts "nnq: #{e.}" exit 1 end end |