Class: OMQ::CLI::BaseRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/cli/base_runner.rb

Overview

Template runner base class for all socket-type CLI runners. Subclasses override #run_loop to implement socket-specific behaviour.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, socket_class) ⇒ BaseRunner

Returns a new instance of BaseRunner.

Parameters:

  • config (Config)

    frozen CLI configuration

  • socket_class (Class)

    OMQ socket class to instantiate (e.g. OMQ::PUSH)



15
16
17
18
19
# File 'lib/omq/cli/base_runner.rb', line 15

def initialize(config, socket_class)
  @config = config
  @klass  = socket_class
  @fmt    = Formatter.new(config.format)
end

Instance Attribute Details

#configConfig, Object (readonly)

Returns:

  • (Config)

    frozen CLI configuration

  • (Object)

    the OMQ socket instance



10
11
12
# File 'lib/omq/cli/base_runner.rb', line 10

def config
  @config
end

#sockConfig, Object (readonly)

Returns:

  • (Config)

    frozen CLI configuration

  • (Object)

    the OMQ socket instance



10
11
12
# File 'lib/omq/cli/base_runner.rb', line 10

def sock
  @sock
end

Instance Method Details

#call(task) ⇒ void

This method returns an undefined value.

Runs the full lifecycle: socket setup, peer wait, BEGIN/END blocks, and the main loop.

Parameters:

  • task (Async::Task)

    the parent async task



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/omq/cli/base_runner.rb', line 26

def call(task)
  set_process_title
  setup_socket
  start_event_monitor
  maybe_start_transient_monitor(task)
  sleep(config.delay) if config.delay && config.recv_only?
  wait_for_peer if needs_peer_wait?
  run_begin_blocks
  run_loop(task)
  run_end_blocks
rescue OMQ::SocketDeadError => error
  reason = error.cause&.message || error.message
  $stderr.write("omq: #{reason}\n")
  exit 1
ensure
  @sock&.close
end