Class: ActiveHarness::Pipeline

Inherits:
Object
  • Object
show all
Includes:
Core::HookRunner
Defined in:
lib/active_harness/pipeline.rb,
lib/active_harness/pipeline/step.rb,
lib/active_harness/pipeline/hooks.rb

Overview

Sequential pipeline that chains agents and tribunals. Each step receives the current payload and can transform it or stop the pipeline.

Usage (subclass with DSL):

class SupportPipeline < ActiveHarness::Pipeline
  step :injection_guard do
    use InjectionGuardAgent
    stop_if ->(result) { result.processed["detected"] == true }
  end

  step :translate, TranslationAgent   # shorthand — no stop_if

  step :safety_tribunal do
    use SafetyTribunal
    stop_if ->(result) { result.processed["verdict"] == false }
  end

  on :before_step do |step_name, payload| ... end
  on :after_step  do |step_name, result|  ... end
  on :before_step, :translate do |payload| ... end
  on :after_step,  :translate do |result|  ... end
  on :stopped  do |step_name, result| ... end
  on :complete do |last_result|       ... end
end

pipeline = SupportPipeline.new(input: "...", context: { user_id: 1 })
pipeline.call
pipeline.output       # => final payload string (nil if stopped)
pipeline.stopped?     # => false
pipeline.steps { |name, result| ... }  # iterate over completed steps

Direct Known Subclasses

SupportPipeline

Defined Under Namespace

Classes: Step

Constant Summary collapse

VALID_HOOKS =
%i[before_step after_step stopped complete].freeze
VALID_STEP_HOOKS =
%i[before_step after_step].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(input:, context: {}, params: {}, memory: nil, token: nil, stream: nil) ⇒ Pipeline

Returns a new instance of Pipeline.



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/active_harness/pipeline.rb', line 112

def initialize(
  input:,
  context: {},
  params:  {},
  memory:  nil,
  token:   nil,
  stream:  nil
)
  @original_input = input
  @payload        = input
  @context        = context.dup
  @params         = params
  @memory         = memory
  @token          = token
  class_streams   = self.class.pipeline_config[:streams] || {}
  @stream         = merge_stream(stream, class_streams)
  @executors      = build_executors
  @step_results   = {}
  @stopped        = false
  @stopped_at     = nil
  @stop_reason    = nil
  @execution_time = nil
  @output         = nil
end

Instance Attribute Details

#contextObject


Instance API




98
99
100
# File 'lib/active_harness/pipeline.rb', line 98

def context
  @context
end

#execution_timeObject (readonly)


Instance API




98
99
100
# File 'lib/active_harness/pipeline.rb', line 98

def execution_time
  @execution_time
end

#original_inputObject (readonly)


Instance API




98
99
100
# File 'lib/active_harness/pipeline.rb', line 98

def original_input
  @original_input
end

#outputObject (readonly)


Instance API




98
99
100
# File 'lib/active_harness/pipeline.rb', line 98

def output
  @output
end

#paramsObject

Returns the value of attribute params.



105
106
107
# File 'lib/active_harness/pipeline.rb', line 105

def params
  @params
end

#stop_reasonObject (readonly)


Instance API




98
99
100
# File 'lib/active_harness/pipeline.rb', line 98

def stop_reason
  @stop_reason
end

#stopped_atObject (readonly)


Instance API




98
99
100
# File 'lib/active_harness/pipeline.rb', line 98

def stopped_at
  @stopped_at
end

Class Method Details

.after(event, step_name = nil, &block) ⇒ Object



50
51
52
# File 'lib/active_harness/pipeline/hooks.rb', line 50

def after(event, step_name = nil, &block)
  on(:"after_#{event}", step_name, &block)
end

.before(event, step_name = nil, &block) ⇒ Object

Rails-style aliases for on:

Global:

before :step                          do |name, payload| end  # → on :before_step
after  :step                          do |name, result|  end  # → on :after_step
callback :stopped                     do |name, result|  end  # → on :stopped
callback :complete                    do |result|        end  # → on :complete

Per-step:

after  :step, :translate              do |result| end
before :step, :translate              do |payload| end


46
47
48
# File 'lib/active_harness/pipeline/hooks.rb', line 46

def before(event, step_name = nil, &block)
  on(:"before_#{event}", step_name, &block)
end

.callback(event, &block) ⇒ Object



54
55
56
# File 'lib/active_harness/pipeline/hooks.rb', line 54

def callback(event, &block)
  on(event, &block)
end

.inherited(subclass) ⇒ Object

Each subclass gets its own isolated config.



58
59
60
61
62
63
# File 'lib/active_harness/pipeline.rb', line 58

def inherited(subclass)
  subclass.instance_variable_set(
    :@pipeline_config,
    { steps: [], hooks: {}, step_hooks: {}, streams: {} }
  )
end

.on(event, step_name = nil, &block) ⇒ Object

Register a global or per-step hook.

Global hooks fire on every step:

on :before_step do |step_name, payload| ... end
on :after_step  do |step_name, result|  ... end
on :stopped     do |step_name, result|  ... end
on :complete    do |last_result|         ... end

Per-step hooks fire only for the named step (no step_name passed):

on :before_step, :translate do |payload| ... end
on :after_step,  :translate do |result|  ... end


18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/active_harness/pipeline/hooks.rb', line 18

def on(event, step_name = nil, &block)
  if step_name
    unless VALID_STEP_HOOKS.include?(event)
      raise ArgumentError,
        "Per-step hooks support: #{VALID_STEP_HOOKS.join(", ")}. Got :#{event}"
    end
    pipeline_config[:step_hooks][step_name] ||= {}
    (pipeline_config[:step_hooks][step_name][event] ||= []) << block
  else
    unless VALID_HOOKS.include?(event)
      raise ArgumentError,
        "Unknown Pipeline hook :#{event}. Valid: #{VALID_HOOKS.join(", ")}"
    end
    (pipeline_config[:hooks][event] ||= []) << block
  end
end

.on_agent_event(&block) ⇒ Object

Class-level event stream handlers — fired for every matching event from any agent or tribunal executed within this pipeline (including agents running inside tribunals). Multiple blocks can be registered; all fire.

The handler receives (event, *args) — already scoped to the source.

on_agent_event do |event, result|
  Rails.logger.info "[Agent #{event}] #{result.model}" if event == :after_call
end

on_tribunal_event do |event, verdict|
  Rails.logger.info "[Tribunal #{event}] verdict=#{verdict}" if event == :after_verdict
end

on_pipeline_event do |event, step_name, _data|
  Rails.logger.info "[Pipeline #{event}] step=#{step_name}"
end


82
83
84
# File 'lib/active_harness/pipeline.rb', line 82

def on_agent_event(&block)
  (pipeline_config[:streams][:agent] ||= []) << block
end

.on_pipeline_event(&block) ⇒ Object



90
91
92
# File 'lib/active_harness/pipeline.rb', line 90

def on_pipeline_event(&block)
  (pipeline_config[:streams][:pipeline] ||= []) << block
end

.on_tribunal_event(&block) ⇒ Object



86
87
88
# File 'lib/active_harness/pipeline.rb', line 86

def on_tribunal_event(&block)
  (pipeline_config[:streams][:tribunal] ||= []) << block
end

.pipeline_configObject



53
54
55
# File 'lib/active_harness/pipeline.rb', line 53

def pipeline_config
  @pipeline_config ||= { steps: [], hooks: {}, step_hooks: {}, streams: {} }
end

.step(name, executor = nil, &block) ⇒ Object

Define a step in the pipeline.

Shorthand (agent only, no stop_if):

step :translate, TranslationAgent

Full block form:

step :injection_guard do
  use InjectionGuardAgent
  stop_if ->(result) { result.processed["detected"] == true }
end


49
50
51
# File 'lib/active_harness/pipeline.rb', line 49

def step(name, executor = nil, &block)
  pipeline_config[:steps] << Pipeline::Step.new(name, executor, &block)
end

Instance Method Details

#call(input = nil, token: nil, stream: nil) ⇒ Object

Execute all steps sequentially. Returns self for chaining. Accepts optional input, token, stream to match the Agent/Tribunal call interface.



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/active_harness/pipeline.rb', line 191

def call(input = nil, token: nil, stream: nil)
  if input
    @original_input = input
    @payload        = input
  end
  @token  = token  if token
  @stream = stream if stream

  config = self.class.pipeline_config
  t0     = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  @memory&.load

  config[:steps].each do |step|
    fire(:before_step, step.name, @payload, config)
    fire_step(:before_step, step.name, @payload, config)

    result = execute_step(step)

    @step_results[step.name] = result
    @context[step.name]      = result
    @payload                 = step.extract_payload(result) if step.transform?

    fire(:after_step, step.name, result, config)
    fire_step(:after_step, step.name, result, config)

    if step.stop_if && step.stop_if.call(result)
      @stopped     = true
      @stopped_at  = step.name
      @stop_reason = result
      run_hooks(config[:hooks], :stopped, step.name, result)
      @stream&.call(:pipeline, :stopped, step.name, result)
      break
    end
  end

  @execution_time = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0).round(3)
  @output         = @payload unless @stopped

  unless @stopped
    @memory&.record(
      request:  @original_input,
      response: @output,
      pipeline: self.class.name
    )

    last_result = @step_results[@step_results.keys.last]
    run_hooks(config[:hooks], :complete, last_result)
    @stream&.call(:pipeline, :complete, last_result)
  end

  self
end

#executorsObject

Returns a hash of pre-created executor instances keyed by step name. Available immediately after new — before call — so instances can be configured (model lists, params, etc.) before execution begins.

pipeline = SupportPipeline.new(input: "...")
pipeline.executors[:translate].models.prepend(provider: :openai, model: "gpt-4.1")
pipeline.executors[:guard].params = { strict: true }
pipeline.call


145
146
147
# File 'lib/active_harness/pipeline.rb', line 145

def executors
  @executors
end

#input=(value) ⇒ Object



107
108
109
110
# File 'lib/active_harness/pipeline.rb', line 107

def input=(value)
  @original_input = value
  @payload        = value
end

#resultObject

Wraps pipeline outcome into a Result so a pipeline can be used as a step inside another pipeline, matching the same interface as Agent and Tribunal.

output — final payload (nil when stopped) processed — { “stopped” => bool, “stopped_at” => step_name_string_or_nil }



180
181
182
183
184
185
186
187
# File 'lib/active_harness/pipeline.rb', line 180

def result
  Result.new(
    input:          @original_input,
    output:         @output,
    processed:      { "stopped" => @stopped, "stopped_at" => @stopped_at&.to_s },
    execution_time: @execution_time
  )
end

#stepsObject

Iterates over completed steps as (name, executor, result) tuples. Steps that did not run (pipeline stopped before reaching them) are skipped. Returns an Enumerator when called without a block.

pipeline.steps { |name, executor, result| }

name     — step name symbol          (:translate, :guard, …)
executor — the instance that ran     (TranslationAgent instance, …)
result   — Result struct             (output, processed, usage, model, …)

pipeline.steps.map { |name, executor, result| [name, result.output] }
pipeline.steps.to_a


165
166
167
168
169
170
171
172
173
# File 'lib/active_harness/pipeline.rb', line 165

def steps
  return enum_for(:steps) unless block_given?
  self.class.pipeline_config[:steps].each do |step|
    result = @step_results[step.name]
    next unless result
    yield step.name, @executors[step.name], result
  end
  self
end

#stopped?Boolean

Returns:

  • (Boolean)


149
150
151
# File 'lib/active_harness/pipeline.rb', line 149

def stopped?
  @stopped
end