Class: OMQ::CLI::ParallelWorker
- Inherits:
-
Object
- Object
- OMQ::CLI::ParallelWorker
- Defined in:
- lib/omq/cli/parallel_worker.rb
Overview
Worker that runs inside a Ractor for parallel socket modes (-P). Each worker owns its own Async reactor and socket instance.
Supported socket types:
- pull, gather (recv-only)
- rep (recv-reply with echo/data/eval)
Instance Method Summary collapse
- #call ⇒ Object
-
#initialize(config, socket_sym, endpoints, output_port, log_port, error_port) ⇒ ParallelWorker
constructor
A new instance of ParallelWorker.
Constructor Details
#initialize(config, socket_sym, endpoints, output_port, log_port, error_port) ⇒ ParallelWorker
Returns a new instance of ParallelWorker.
13 14 15 16 17 18 19 20 |
# File 'lib/omq/cli/parallel_worker.rb', line 13 def initialize(config, socket_sym, endpoints, output_port, log_port, error_port) @config = config @socket_sym = socket_sym @endpoints = endpoints @output_port = output_port @log_port = log_port @error_port = error_port end |
Instance Method Details
#call ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/omq/cli/parallel_worker.rb', line 23 def call Async do setup_socket log_endpoints start_monitors wait_for_peer compile_expr run_loop run_end_block rescue OMQ::SocketDeadError => error # Socket was killed by a protocol violation on the peer side # (see Engine#signal_fatal_error). Surface the underlying # cause via the log stream and exit cleanly -- the Ractor # completes, consumer threads unblock. reason = error.cause&. || error. @log_port.send("omq: #{reason}") rescue => error @error_port.send("#{error.class}: #{error.}") ensure @sock&.close end end |