Class: ActiveHarness::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/active_harness/pipeline.rb,
lib/active_harness/pipeline/step.rb

Overview

Sequential pipeline that chains agents and tribunals. Each step receives the current payload and can transform it or stop the pipeline.

Usage (subclass with DSL):

class SupportPipeline < ActiveHarness::Pipeline
  step :injection_guard do
    use InjectionGuardAgent
    stop_if ->(result) { result.parsed["detected"] == true }
  end

  step :translate, TranslationAgent   # shorthand — no stop_if

  step :safety_tribunal do
    use SafetyTribunal
    stop_if ->(result) { result.verdict == false }
  end

  on :before_step do |step_name, payload| ... end
  on :after_step  do |step_name, result|  ... end
  on :before_step, :translate do |payload| ... end
  on :after_step,  :translate do |result|  ... end
  on :stopped  do |step_name, result| ... end
  on :complete do |last_result|       ... end
end

pipeline = SupportPipeline.new(input: "...", context: { user_id: 1 })
pipeline.call
pipeline.output       # => final payload string (nil if stopped)
pipeline.stopped?     # => false
pipeline.step_results # => { translate: <Result>, ... }

Direct Known Subclasses

TestSupportPipeline

Defined Under Namespace

Classes: Step

Constant Summary collapse

VALID_HOOKS =
%i[before_step after_step stopped complete].freeze
VALID_STEP_HOOKS =
%i[before_step after_step].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(input:, context: {}, memory: nil) ⇒ Pipeline

Returns a new instance of Pipeline.



126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/active_harness/pipeline.rb', line 126

def initialize(input:, context: {}, memory: nil)
  @original_input = input
  @payload        = input
  @context        = context.dup
  @memory         = memory
  @step_results   = {}
  @stopped        = false
  @stopped_at     = nil
  @stop_reason    = nil
  @execution_time = nil
  @output         = nil
end

Instance Attribute Details

#contextObject (readonly)


Instance API




123
124
125
# File 'lib/active_harness/pipeline.rb', line 123

def context
  @context
end

#execution_timeObject (readonly)


Instance API




123
124
125
# File 'lib/active_harness/pipeline.rb', line 123

def execution_time
  @execution_time
end

#original_inputObject (readonly)


Instance API




123
124
125
# File 'lib/active_harness/pipeline.rb', line 123

def original_input
  @original_input
end

#outputObject (readonly)


Instance API




123
124
125
# File 'lib/active_harness/pipeline.rb', line 123

def output
  @output
end

#step_resultsObject (readonly)


Instance API




123
124
125
# File 'lib/active_harness/pipeline.rb', line 123

def step_results
  @step_results
end

#stop_reasonObject (readonly)


Instance API




123
124
125
# File 'lib/active_harness/pipeline.rb', line 123

def stop_reason
  @stop_reason
end

#stopped_atObject (readonly)


Instance API




123
124
125
# File 'lib/active_harness/pipeline.rb', line 123

def stopped_at
  @stopped_at
end

Class Method Details

.after(event, step_name = nil, &block) ⇒ Object



99
100
101
# File 'lib/active_harness/pipeline.rb', line 99

def after(event, step_name = nil, &block)
  on(:"after_#{event}", step_name, &block)
end

.before(event, step_name = nil, &block) ⇒ Object

Rails-style aliases for on:

Global:

before :step                          do |name, payload| end  # → on :before_step
after  :step                          do |name, result|  end  # → on :after_step
callback :stopped                     do |name, result|  end  # → on :stopped
callback :complete                    do |result|        end  # → on :complete

Per-step:

after  :step, :translate              do |result| end
before :step, :translate              do |payload| end


95
96
97
# File 'lib/active_harness/pipeline.rb', line 95

def before(event, step_name = nil, &block)
  on(:"before_#{event}", step_name, &block)
end

.callback(event, &block) ⇒ Object



103
104
105
# File 'lib/active_harness/pipeline.rb', line 103

def callback(event, &block)
  on(event, &block)
end

.inherited(subclass) ⇒ Object

Each subclass gets its own isolated config.



112
113
114
115
116
117
# File 'lib/active_harness/pipeline.rb', line 112

def inherited(subclass)
  subclass.instance_variable_set(
    :@pipeline_config,
    { steps: [], hooks: {}, step_hooks: {} }
  )
end

.on(event, step_name = nil, &block) ⇒ Object

Register a global or per-step hook.

Global hooks fire on every step:

on :before_step do |step_name, payload| ... end
on :after_step  do |step_name, result|  ... end
on :stopped     do |step_name, result|  ... end
on :complete    do |last_result|         ... end

Per-step hooks fire only for the named step (no step_name passed):

on :before_step, :translate do |payload| ... end
on :after_step,  :translate do |result|  ... end


67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/active_harness/pipeline.rb', line 67

def on(event, step_name = nil, &block)
  if step_name
    unless VALID_STEP_HOOKS.include?(event)
      raise ArgumentError,
        "Per-step hooks support: #{VALID_STEP_HOOKS.join(", ")}. Got :#{event}"
    end
    pipeline_config[:step_hooks][step_name] ||= {}
    pipeline_config[:step_hooks][step_name][event] = block
  else
    unless VALID_HOOKS.include?(event)
      raise ArgumentError,
        "Unknown Pipeline hook :#{event}. Valid: #{VALID_HOOKS.join(", ")}"
    end
    pipeline_config[:hooks][event] = block
  end
end

.pipeline_configObject



107
108
109
# File 'lib/active_harness/pipeline.rb', line 107

def pipeline_config
  @pipeline_config ||= { steps: [], hooks: {}, step_hooks: {} }
end

.step(name, agent_class = nil, &block) ⇒ Object

Define a step in the pipeline.

Shorthand (agent only, no stop_if):

step :translate, TranslationAgent

Full block form:

step :injection_guard do
  use InjectionGuardAgent
  stop_if ->(result) { result.parsed["detected"] == true }
end


52
53
54
# File 'lib/active_harness/pipeline.rb', line 52

def step(name, agent_class = nil, &block)
  pipeline_config[:steps] << Pipeline::Step.new(name, agent_class, &block)
end

Instance Method Details

#callObject

Execute all steps sequentially. Returns self for chaining.



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/active_harness/pipeline.rb', line 144

def call
  config = self.class.pipeline_config
  t0     = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  @memory&.load

  config[:steps].each do |step|
    fire_global(:before_step, step.name, @payload, config)
    fire_step(:before_step, step.name, @payload, config)

    result = execute_step(step)

    @step_results[step.name] = result
    @context[step.name]      = result
    @payload                 = result.output if step.transform?

    fire_global(:after_step, step.name, result, config)
    fire_step(:after_step, step.name, result, config)

    if step.stop_if && step.stop_if.call(result)
      @stopped     = true
      @stopped_at  = step.name
      @stop_reason = result
      config[:hooks][:stopped]&.call(step.name, result)
      break
    end
  end

  @execution_time = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0).round(3)
  @output         = @payload unless @stopped

  unless @stopped
    @memory&.record(
      request:  @original_input,
      response: @output,
      pipeline: self.class.name
    )

    last_result = @step_results[@step_results.keys.last]
    config[:hooks][:complete]&.call(last_result)
  end

  self
end

#stopped?Boolean

Returns:

  • (Boolean)


139
140
141
# File 'lib/active_harness/pipeline.rb', line 139

def stopped?
  @stopped
end