Class: CMDx::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/cmdx/pipeline.rb

Overview

Runs a Workflow’s declared task groups. Each group selects a strategy (‘:sequential` by default, or `:parallel`). A group failure halts the pipeline by echoing the failed result’s signal through ‘throw!`, which bubbles up through Runtime as the workflow’s own failure.

Groups may opt into batch semantics with ‘continue_on_failure: true`, in which case every task in the group runs to completion and all failures are aggregated into the workflow’s ‘errors` (keyed as `“TaskClass.input”` for input/validation errors and `“TaskClass.<status>”` for bare `fail!` reasons) before the pipeline halts on the first failure (declaration order).

See Also:

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workflow) ⇒ Pipeline

Returns a new instance of Pipeline.

Parameters:

  • workflow (Task)

    workflow instance whose class includes Workflow



30
31
32
33
# File 'lib/cmdx/pipeline.rb', line 30

def initialize(workflow)
  @workflow = workflow
  @executed = []
end

Class Method Details

.execute(workflow) ⇒ void

This method returns an undefined value.

Parameters:

  • workflow (Task)

    workflow instance whose class includes Workflow



23
24
25
# File 'lib/cmdx/pipeline.rb', line 23

def execute(workflow)
  new(workflow).execute
end

Instance Method Details

#executevoid

This method returns an undefined value.

Iterates every group in the workflow’s pipeline, respecting ‘:if`/`:unless` and the `:strategy` key. Any group that produces a failed result halts execution by throwing through the workflow.

On halt, every previously executed task instance whose result is ‘success?` is sent `#rollback` (when defined) in reverse execution order, providing saga-style compensation. Each compensated result has its `:rolled_back` option flipped to `true`. Skipped tasks are excluded; the failing task itself is rolled back by Runtime and is not re-invoked here. Exceptions raised inside a compensator propagate — handling them is the developer’s responsibility.

Raises:

  • (ArgumentError)

    for an unknown strategy

  • (StandardError)

    anything raised by a task’s ‘#rollback`



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/cmdx/pipeline.rb', line 50

def execute
  @workflow.class.pipeline.each do |group|
    next unless Util.satisfied?(group.options[:if], group.options[:unless], @workflow)

    halt =
      case strategy = group.options[:strategy]
      when :sequential, NilClass
        run_sequential(group)
      when :parallel
        run_parallel(group)
      else
        raise ArgumentError, "invalid strategy: #{strategy.inspect}"
      end

    next unless halt

    rollback_executed!
    @workflow.send(:throw!, halt)
  end
end