Module: OMQ::CLI
- Defined in:
- lib/omq/cli.rb,
lib/omq/cli/pair.rb,
lib/omq/cli/pipe.rb,
lib/omq/cli/term.rb,
lib/omq/cli/config.rb,
lib/omq/cli/pub_sub.rb,
lib/omq/cli/req_rep.rb,
lib/omq/cli/version.rb,
lib/omq/cli/formatter.rb,
lib/omq/cli/push_pull.rb,
lib/omq/cli/cli_parser.rb,
lib/omq/cli/radio_dish.rb,
lib/omq/cli/base_runner.rb,
lib/omq/cli/pipe_worker.rb,
lib/omq/cli/socket_setup.rb,
lib/omq/cli/client_server.rb,
lib/omq/cli/router_dealer.rb,
lib/omq/cli/ractor_helpers.rb,
lib/omq/cli/routing_helper.rb,
lib/omq/cli/scatter_gather.rb,
lib/omq/cli/parallel_worker.rb,
lib/omq/cli/transient_monitor.rb,
lib/omq/cli/expression_evaluator.rb
Overview
Command-line interface for OMQ socket operations.
Defined Under Namespace
Modules: RactorHelpers, RoutingHelper, SocketSetup, Term Classes: BaseRunner, CliParser, Config, DishRunner, Endpoint, ExpressionEvaluator, Formatter, GatherRunner, PairRunner, ParallelWorker, PipeRunner, PipeWorker, PubRunner, PullRunner, PushRunner, RadioRunner, RepRunner, ReqRunner, RouterRunner, ScatterRunner, ServerRunner, SubRunner, TransientMonitor
Constant Summary collapse
- SOCKET_TYPE_NAMES =
%w[ req rep pub sub push pull pair dealer router client server radio dish scatter gather channel peer pipe ].freeze
- RUNNER_MAP =
{ "push" => [PushRunner, :PUSH], "pull" => [PullRunner, :PULL], "pub" => [PubRunner, :PUB], "sub" => [SubRunner, :SUB], "req" => [ReqRunner, :REQ], "rep" => [RepRunner, :REP], "dealer" => [PairRunner, :DEALER], "router" => [RouterRunner, :ROUTER], "pair" => [PairRunner, :PAIR], "client" => [ReqRunner, :CLIENT], "server" => [ServerRunner, :SERVER], "radio" => [RadioRunner, :RADIO], "dish" => [DishRunner, :DISH], "scatter" => [ScatterRunner, :SCATTER], "gather" => [GatherRunner, :GATHER], "channel" => [PairRunner, :CHANNEL], "peer" => [ServerRunner, :PEER], "pipe" => [PipeRunner, nil], }.freeze
- SEND_ONLY =
Socket type names that only send messages.
%w[pub push scatter radio].freeze
- RECV_ONLY =
Socket type names that only receive messages.
%w[sub pull gather dish].freeze
- VERSION =
"0.15.1"
Class Method Summary collapse
-
.build_config(argv) ⇒ Object
Builds a frozen Config from command-line arguments.
-
.load_curve_crypto(name, verbose: false) ⇒ Module
Loads the named NaCl-compatible crypto backend.
-
.page(text) ⇒ Object
Displays text through the system pager, or prints directly when stdout is not a terminal.
-
.run(argv = ARGV) ⇒ void
Main entry point: dispatches to keygen or socket runner.
-
.run_keygen(argv) ⇒ Object
Generates a persistent CURVE keypair and prints it as Z85-encoded env vars.
-
.run_socket(argv) ⇒ Object
Parses CLI arguments, validates options, and runs the main event loop inside an Async reactor.
Class Method Details
.build_config(argv) ⇒ Object
Builds a frozen Config from command-line arguments.
285 286 287 288 289 290 291 292 |
# File 'lib/omq/cli.rb', line 285 def build_config(argv) opts = CliParser.parse(argv) CliParser.validate!(opts) opts[:stdin_is_tty] = $stdin.tty? Ractor.make_shareable(Config.new(**opts)) end |
.load_curve_crypto(name, verbose: false) ⇒ Module
Loads the named NaCl-compatible crypto backend.
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/omq/cli.rb', line 168 def load_curve_crypto(name, verbose: false) crypto = case name&.downcase when "rbnacl" require "rbnacl" RbNaCl when "nuckle" require "nuckle" Nuckle when nil begin require "rbnacl" RbNaCl rescue LoadError abort "CURVE requires libsodium. Install it:\n" \ " apt install libsodium-dev # Debian/Ubuntu\n" \ " brew install libsodium # macOS\n" \ "Or use nuckle (pure Ruby, DANGEROUS -- not audited):\n" \ " --crypto nuckle" end else abort "Unknown CURVE crypto backend: #{name}. Use 'rbnacl' or 'nuckle'." end $stderr.puts "omq: CURVE crypto backend: #{crypto.name}" if verbose crypto rescue LoadError abort "Could not load #{name} gem: gem install #{name}" end |
.page(text) ⇒ Object
Displays text through the system pager, or prints directly when stdout is not a terminal.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/omq/cli.rb', line 94 def page(text) if $stdout.tty? if ENV["PAGER"] pager = ENV["PAGER"] else ENV["LESS"] ||= "-FR" pager = "less" end IO.popen(pager, "w") { |io| io.puts text } else puts text end rescue Errno::ENOENT puts text rescue Errno::EPIPE, Interrupt # user quit pager early (q) or hit ^C while paging end |
.run(argv = ARGV) ⇒ void
This method returns an undefined value.
Main entry point: dispatches to keygen or socket runner.
117 118 119 120 121 122 123 124 125 |
# File 'lib/omq/cli.rb', line 117 def run(argv = ARGV) case argv.first when "keygen" argv.shift run_keygen(argv) else run_socket(argv) end end |
.run_keygen(argv) ⇒ Object
Generates a persistent CURVE keypair and prints it as Z85-encoded env vars.
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/omq/cli.rb', line 131 def run_keygen(argv) crypto_name = nil verbose = false while (arg = argv.shift) case arg when "--crypto" crypto_name = argv.shift when "-v", "--verbose" verbose = true when "-h", "--help" puts "Usage: omq keygen [--crypto rbnacl|nuckle] [-v]\n\n" \ "Generates a CURVE keypair for persistent server identity.\n" \ "Output: Z85-encoded env vars for use with --curve-server." exit else abort "omq keygen: unknown option: #{arg}" end end crypto_name ||= ENV["OMQ_CRYPTO"] crypto = load_curve_crypto(crypto_name, verbose: verbose) require "protocol/zmtp/mechanism/curve" require "protocol/zmtp/z85" key = crypto::PrivateKey.generate puts "OMQ_SERVER_PUBLIC='#{Protocol::ZMTP::Z85.encode(key.public_key.to_s)}'" puts "OMQ_SERVER_SECRET='#{Protocol::ZMTP::Z85.encode(key.to_s)}'" end |
.run_socket(argv) ⇒ Object
Parses CLI arguments, validates options, and runs the main event loop inside an Async reactor.
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/omq/cli.rb', line 200 def run_socket(argv) config = build_config(argv) require "omq" require "omq/rfc/clientserver" require "omq/rfc/radiodish" require "omq/rfc/scattergather" require "omq/rfc/channel" require "omq/rfc/p2p" require "omq/rfc/zstd" require "async" require "json" require "console" CliParser.validate_gems!(config) trap("INT") { Process.exit!(0) } trap("TERM") { Process.exit!(0) } Console.logger = Console::Logger.new(Console::Output::Null.new) unless config.verbose >= 1 debug_ep = nil if ENV["OMQ_DEBUG_URI"] begin require "async/debug" debug_ep = Async::HTTP::Endpoint.parse(ENV["OMQ_DEBUG_URI"]) if debug_ep.scheme == "https" require "localhost" debug_ep = Async::HTTP::Endpoint.parse(ENV["OMQ_DEBUG_URI"], ssl_context: Localhost::Authority.fetch.server_context) end rescue LoadError abort "OMQ_DEBUG_URI requires the async-debug gem: gem install async-debug" end end if config.type_name.nil? Process.setproctitle("omq script") Object.include(OMQ) unless Object.include?(OMQ) Async annotation: 'omq' do Async::Debug.serve(endpoint: debug_ep) if debug_ep config.scripts.each { |s| load_script(s) } rescue => e $stderr.puts "omq: #{e.}" exit 1 end return end runner_class, socket_sym = RUNNER_MAP.fetch(config.type_name) Async annotation: "omq #{config.type_name}" do |task| Async::Debug.serve(endpoint: debug_ep) if debug_ep config.scripts.each { |s| load_script(s) } runner = if socket_sym runner_class.new(config, OMQ.const_get(socket_sym)) else runner_class.new(config) end runner.call(task) rescue IO::TimeoutError, Async::TimeoutError $stderr.puts "omq: timeout" unless config.quiet exit 2 rescue OMQ::SocketDeadError => e $stderr.puts "omq: #{e.cause.class}: #{e.cause.}" exit 1 rescue ::Socket::ResolutionError => e $stderr.puts "omq: #{e.}" exit 1 end end |