Class: OMQ::CLI::PipeRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/cli/pipe.rb

Overview

Runner for the virtual “pipe” socket type (PULL -> eval -> PUSH). Supports sequential and parallel (Ractor-based) processing modes.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ PipeRunner

Returns a new instance of PipeRunner.

Parameters:

  • config (Config)

    frozen CLI configuration



13
14
15
16
# File 'lib/omq/cli/pipe.rb', line 13

def initialize(config)
  @config = config
  @fmt    = Formatter.new(config.format)
end

Instance Attribute Details

#configConfig (readonly)

Returns frozen CLI configuration.

Returns:

  • (Config)

    frozen CLI configuration



9
10
11
# File 'lib/omq/cli/pipe.rb', line 9

def config
  @config
end

Instance Method Details

#call(task) ⇒ void

This method returns an undefined value.

Runs the pipe in sequential or parallel mode based on config.

Parameters:

  • task (Async::Task)

    the parent async task



23
24
25
26
27
28
29
# File 'lib/omq/cli/pipe.rb', line 23

def call(task)
  if config.parallel
    run_parallel(task)
  else
    run_sequential(task)
  end
end