Class: OMQ::CLI::PipeWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/cli/pipe_worker.rb

Overview

Worker that runs inside a Ractor for pipe -P parallel mode. Each worker owns its own Async reactor, PULL socket, and PUSH socket.

Instance Method Summary collapse

Constructor Details

#initialize(config, in_eps, out_eps, log_port, error_port = nil) ⇒ PipeWorker

Returns a new instance of PipeWorker.



9
10
11
12
13
14
15
# File 'lib/omq/cli/pipe_worker.rb', line 9

def initialize(config, in_eps, out_eps, log_port, error_port = nil)
  @config     = config
  @in_eps     = in_eps
  @out_eps    = out_eps
  @log_port   = log_port
  @error_port = error_port
end

Instance Method Details

#callObject



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/omq/cli/pipe_worker.rb', line 18

def call
  Async do
    setup_sockets
    log_endpoints if @config.verbose >= 1
    start_monitors
    wait_for_peers_with_timeout if @config.timeout
    compile_expr
    run_message_loop
    run_end_block
  rescue OMQ::SocketDeadError => error
    reason = error.cause&.message || error.message
    @log_port.send("omq: #{reason}")
  ensure
    @pull&.close
    @push&.close
  end
end