Class: ActiveHarness::Pipeline

Inherits:
Object
  • Object
show all
Includes:
Core::HookRunner
Defined in:
lib/active_harness/pipeline.rb,
lib/active_harness/pipeline/step.rb,
lib/active_harness/pipeline/hooks.rb

Overview

Sequential pipeline that chains agents and tribunals. Each step receives the current payload and can transform it or stop the pipeline.

Usage (subclass with DSL):

class SupportPipeline < ActiveHarness::Pipeline
  step :injection_guard do
    use InjectionGuardAgent
    stop_if ->(result) { result.processed["detected"] == true }
  end

  step :translate, TranslationAgent   # shorthand — no stop_if

  step :safety_tribunal do
    use SafetyTribunal
    stop_if ->(result) { result.processed["verdict"] == false }
  end

  on :before_step do |step_name, payload| ... end
  on :after_step  do |step_name, result|  ... end
  on :before_step, :translate do |payload| ... end
  on :after_step,  :translate do |result|  ... end
  on :stopped  do |step_name, result| ... end
  on :complete do |last_result|       ... end
end

pipeline = SupportPipeline.new(input: "...", context: { user_id: 1 })
pipeline.call
pipeline.output       # => final payload string (nil if stopped)
pipeline.stopped?     # => false
pipeline.step_results # => { translate: <Result>, ... }

Direct Known Subclasses

SupportPipeline

Defined Under Namespace

Classes: Step

Constant Summary collapse

VALID_HOOKS =
%i[before_step after_step stopped complete].freeze
VALID_STEP_HOOKS =
%i[before_step after_step].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(input:, context: {}, params: {}, memory: nil, token: nil, stream: nil) ⇒ Pipeline

Returns a new instance of Pipeline.



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/active_harness/pipeline.rb', line 113

def initialize(
  input:,
  context: {},
  params:  {},
  memory:  nil,
  token:   nil,
  stream:  nil
)
  @original_input = input
  @payload        = input
  @context        = context.dup
  @params         = params
  @memory         = memory
  @token          = token
  class_streams   = self.class.pipeline_config[:streams] || {}
  @stream         = merge_stream(stream, class_streams)
  @step_results   = {}
  @stopped        = false
  @stopped_at     = nil
  @stop_reason    = nil
  @execution_time = nil
  @output         = nil
end

Instance Attribute Details

#contextObject


Instance API




98
99
100
# File 'lib/active_harness/pipeline.rb', line 98

def context
  @context
end

#execution_timeObject (readonly)


Instance API




98
99
100
# File 'lib/active_harness/pipeline.rb', line 98

def execution_time
  @execution_time
end

#original_inputObject (readonly)


Instance API




98
99
100
# File 'lib/active_harness/pipeline.rb', line 98

def original_input
  @original_input
end

#outputObject (readonly)


Instance API




98
99
100
# File 'lib/active_harness/pipeline.rb', line 98

def output
  @output
end

#paramsObject

Returns the value of attribute params.



106
107
108
# File 'lib/active_harness/pipeline.rb', line 106

def params
  @params
end

#step_resultsObject (readonly)


Instance API




98
99
100
# File 'lib/active_harness/pipeline.rb', line 98

def step_results
  @step_results
end

#stop_reasonObject (readonly)


Instance API




98
99
100
# File 'lib/active_harness/pipeline.rb', line 98

def stop_reason
  @stop_reason
end

#stopped_atObject (readonly)


Instance API




98
99
100
# File 'lib/active_harness/pipeline.rb', line 98

def stopped_at
  @stopped_at
end

Class Method Details

.after(event, step_name = nil, &block) ⇒ Object



50
51
52
# File 'lib/active_harness/pipeline/hooks.rb', line 50

def after(event, step_name = nil, &block)
  on(:"after_#{event}", step_name, &block)
end

.before(event, step_name = nil, &block) ⇒ Object

Rails-style aliases for on:

Global:

before :step                          do |name, payload| end  # → on :before_step
after  :step                          do |name, result|  end  # → on :after_step
callback :stopped                     do |name, result|  end  # → on :stopped
callback :complete                    do |result|        end  # → on :complete

Per-step:

after  :step, :translate              do |result| end
before :step, :translate              do |payload| end


46
47
48
# File 'lib/active_harness/pipeline/hooks.rb', line 46

def before(event, step_name = nil, &block)
  on(:"before_#{event}", step_name, &block)
end

.callback(event, &block) ⇒ Object



54
55
56
# File 'lib/active_harness/pipeline/hooks.rb', line 54

def callback(event, &block)
  on(event, &block)
end

.inherited(subclass) ⇒ Object

Each subclass gets its own isolated config.



58
59
60
61
62
63
# File 'lib/active_harness/pipeline.rb', line 58

def inherited(subclass)
  subclass.instance_variable_set(
    :@pipeline_config,
    { steps: [], hooks: {}, step_hooks: {}, streams: {} }
  )
end

.on(event, step_name = nil, &block) ⇒ Object

Register a global or per-step hook.

Global hooks fire on every step:

on :before_step do |step_name, payload| ... end
on :after_step  do |step_name, result|  ... end
on :stopped     do |step_name, result|  ... end
on :complete    do |last_result|         ... end

Per-step hooks fire only for the named step (no step_name passed):

on :before_step, :translate do |payload| ... end
on :after_step,  :translate do |result|  ... end


18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/active_harness/pipeline/hooks.rb', line 18

def on(event, step_name = nil, &block)
  if step_name
    unless VALID_STEP_HOOKS.include?(event)
      raise ArgumentError,
        "Per-step hooks support: #{VALID_STEP_HOOKS.join(", ")}. Got :#{event}"
    end
    pipeline_config[:step_hooks][step_name] ||= {}
    (pipeline_config[:step_hooks][step_name][event] ||= []) << block
  else
    unless VALID_HOOKS.include?(event)
      raise ArgumentError,
        "Unknown Pipeline hook :#{event}. Valid: #{VALID_HOOKS.join(", ")}"
    end
    (pipeline_config[:hooks][event] ||= []) << block
  end
end

.on_agent_event(&block) ⇒ Object

Class-level event stream handlers — fired for every matching event from any agent or tribunal executed within this pipeline (including agents running inside tribunals). Multiple blocks can be registered; all fire.

The handler receives (event, *args) — already scoped to the source.

on_agent_event do |event, result|
  Rails.logger.info "[Agent #{event}] #{result.model}" if event == :after_call
end

on_tribunal_event do |event, verdict|
  Rails.logger.info "[Tribunal #{event}] verdict=#{verdict}" if event == :after_verdict
end

on_pipeline_event do |event, step_name, _data|
  Rails.logger.info "[Pipeline #{event}] step=#{step_name}"
end


82
83
84
# File 'lib/active_harness/pipeline.rb', line 82

def on_agent_event(&block)
  (pipeline_config[:streams][:agent] ||= []) << block
end

.on_pipeline_event(&block) ⇒ Object



90
91
92
# File 'lib/active_harness/pipeline.rb', line 90

def on_pipeline_event(&block)
  (pipeline_config[:streams][:pipeline] ||= []) << block
end

.on_tribunal_event(&block) ⇒ Object



86
87
88
# File 'lib/active_harness/pipeline.rb', line 86

def on_tribunal_event(&block)
  (pipeline_config[:streams][:tribunal] ||= []) << block
end

.pipeline_configObject



53
54
55
# File 'lib/active_harness/pipeline.rb', line 53

def pipeline_config
  @pipeline_config ||= { steps: [], hooks: {}, step_hooks: {}, streams: {} }
end

.step(name, agent_class = nil, &block) ⇒ Object

Define a step in the pipeline.

Shorthand (agent only, no stop_if):

step :translate, TranslationAgent

Full block form:

step :injection_guard do
  use InjectionGuardAgent
  stop_if ->(result) { result.processed["detected"] == true }
end


49
50
51
# File 'lib/active_harness/pipeline.rb', line 49

def step(name, agent_class = nil, &block)
  pipeline_config[:steps] << Pipeline::Step.new(name, agent_class, &block)
end

Instance Method Details

#callObject

Execute all steps sequentially. Returns self for chaining.



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/active_harness/pipeline.rb', line 156

def call
  config = self.class.pipeline_config
  t0     = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  @memory&.load

  config[:steps].each do |step|
    fire(:before_step, step.name, @payload, config)
    fire_step(:before_step, step.name, @payload, config)

    result = execute_step(step)

    @step_results[step.name] = result
    @context[step.name]      = result
    @payload                 = step.extract_payload(result) if step.transform?

    fire(:after_step, step.name, result, config)
    fire_step(:after_step, step.name, result, config)

    if step.stop_if && step.stop_if.call(result)
      @stopped     = true
      @stopped_at  = step.name
      @stop_reason = result
      run_hooks(config[:hooks], :stopped, step.name, result)
      @stream&.call(:pipeline, :stopped, step.name, result)
      break
    end
  end

  @execution_time = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0).round(3)
  @output         = @payload unless @stopped

  unless @stopped
    @memory&.record(
      request:  @original_input,
      response: @output,
      pipeline: self.class.name
    )

    last_result = @step_results[@step_results.keys.last]
    run_hooks(config[:hooks], :complete, last_result)
    @stream&.call(:pipeline, :complete, last_result)
  end

  self
end

#input=(value) ⇒ Object



108
109
110
111
# File 'lib/active_harness/pipeline.rb', line 108

def input=(value)
  @original_input = value
  @payload        = value
end

#resultObject

Wraps pipeline outcome into a Result so a pipeline can be used as a step inside another pipeline, matching the same interface as Agent and Tribunal.

output — final payload (nil when stopped) processed — { “stopped” => bool, “stopped_at” => step_name_string_or_nil }



146
147
148
149
150
151
152
153
# File 'lib/active_harness/pipeline.rb', line 146

def result
  Result.new(
    input:          @original_input,
    output:         @output,
    processed:      { "stopped" => @stopped, "stopped_at" => @stopped_at&.to_s },
    execution_time: @execution_time
  )
end

#stopped?Boolean

Returns:

  • (Boolean)


137
138
139
# File 'lib/active_harness/pipeline.rb', line 137

def stopped?
  @stopped
end