Class: OMQ::CLI::ParallelWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/cli/parallel_worker.rb

Overview

Worker that runs inside a Ractor for parallel socket modes (-P). Each worker owns its own Async reactor and socket instance.

Supported socket types:

- pull, gather  (recv-only)
- rep           (recv-reply with echo/data/eval)

Instance Method Summary collapse

Constructor Details

#initialize(config, socket_sym, endpoints, output_port, log_port, error_port) ⇒ ParallelWorker

Returns a new instance of ParallelWorker.



13
14
15
16
17
18
19
20
# File 'lib/omq/cli/parallel_worker.rb', line 13

def initialize(config, socket_sym, endpoints, output_port, log_port, error_port)
  @config      = config
  @socket_sym  = socket_sym
  @endpoints   = endpoints
  @output_port = output_port
  @log_port    = log_port
  @error_port  = error_port
end

Instance Method Details

#callObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/omq/cli/parallel_worker.rb', line 23

def call
  Async do
    setup_socket
    log_endpoints
    start_monitors
    wait_for_peer
    compile_expr
    run_loop
    run_end_block
  rescue OMQ::SocketDeadError => error
    # Socket was killed by a protocol violation on the peer side
    # (see Engine#signal_fatal_error). Surface the underlying
    # cause via the log stream and exit cleanly -- the Ractor
    # completes, consumer threads unblock.
    reason = error.cause&.message || error.message
    @log_port.send("omq: #{reason}")
  rescue => error
    @error_port.send("#{error.class}: #{error.message}")
  ensure
    @sock&.close
  end
end