Class: ActiveHarness::Pipeline

Inherits:
Object
  • Object
show all
Includes:
Core::HookRunner
Defined in:
lib/active_harness/pipeline.rb,
lib/active_harness/pipeline/step.rb,
lib/active_harness/pipeline/hooks.rb

Overview

Sequential pipeline that chains agents and tribunals. Each step receives the current payload and can transform it or stop the pipeline.

Usage (subclass with DSL):

class SupportPipeline < ActiveHarness::Pipeline
  step :injection_guard do
    use InjectionGuardAgent
    stop_if ->(result) { result.parsed["detected"] == true }
  end

  step :translate, TranslationAgent   # shorthand — no stop_if

  step :safety_tribunal do
    use SafetyTribunal
    stop_if ->(result) { result.verdict == false }
  end

  on :before_step do |step_name, payload| ... end
  on :after_step  do |step_name, result|  ... end
  on :before_step, :translate do |payload| ... end
  on :after_step,  :translate do |result|  ... end
  on :stopped  do |step_name, result| ... end
  on :complete do |last_result|       ... end
end

pipeline = SupportPipeline.new(input: "...", context: { user_id: 1 })
pipeline.call
pipeline.output       # => final payload string (nil if stopped)
pipeline.stopped?     # => false
pipeline.step_results # => { translate: <Result>, ... }

Direct Known Subclasses

SupportPipeline

Defined Under Namespace

Classes: Step

Constant Summary collapse

VALID_HOOKS =
%i[before_step after_step stopped complete].freeze
VALID_STEP_HOOKS =
%i[before_step after_step].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(input:, context: {}, params: {}, memory: nil, streams: {}) ⇒ Pipeline

Returns a new instance of Pipeline.



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/active_harness/pipeline.rb', line 114

def initialize(
  input:,
  context: {},
  params:  {},
  memory:  nil,
  streams: {}
)
  @original_input        = input
  @payload               = input
  @context               = context.dup
  @params                = params
  @memory                = memory
  @token_stream          = streams[:token]
  class_streams          = self.class.pipeline_config[:streams] || {}
  @agent_event_stream    = merge_stream(streams[:agent],    class_streams[:agent])
  @tribunal_event_stream = merge_stream(streams[:tribunal], class_streams[:tribunal])
  @pipeline_event_stream = merge_stream(streams[:pipeline], class_streams[:pipeline])
  @step_results          = {}
  @stopped               = false
  @stopped_at            = nil
  @stop_reason           = nil
  @execution_time        = nil
  @output                = nil
end

Instance Attribute Details

#contextObject


Instance API




99
100
101
# File 'lib/active_harness/pipeline.rb', line 99

def context
  @context
end

#execution_timeObject (readonly)


Instance API




99
100
101
# File 'lib/active_harness/pipeline.rb', line 99

def execution_time
  @execution_time
end

#original_inputObject (readonly)


Instance API




99
100
101
# File 'lib/active_harness/pipeline.rb', line 99

def original_input
  @original_input
end

#outputObject (readonly)


Instance API




99
100
101
# File 'lib/active_harness/pipeline.rb', line 99

def output
  @output
end

#paramsObject

Returns the value of attribute params.



107
108
109
# File 'lib/active_harness/pipeline.rb', line 107

def params
  @params
end

#step_resultsObject (readonly)


Instance API




99
100
101
# File 'lib/active_harness/pipeline.rb', line 99

def step_results
  @step_results
end

#stop_reasonObject (readonly)


Instance API




99
100
101
# File 'lib/active_harness/pipeline.rb', line 99

def stop_reason
  @stop_reason
end

#stopped_atObject (readonly)


Instance API




99
100
101
# File 'lib/active_harness/pipeline.rb', line 99

def stopped_at
  @stopped_at
end

Class Method Details

.after(event, step_name = nil, &block) ⇒ Object



50
51
52
# File 'lib/active_harness/pipeline/hooks.rb', line 50

def after(event, step_name = nil, &block)
  on(:"after_#{event}", step_name, &block)
end

.before(event, step_name = nil, &block) ⇒ Object

Rails-style aliases for on:

Global:

before :step                          do |name, payload| end  # → on :before_step
after  :step                          do |name, result|  end  # → on :after_step
callback :stopped                     do |name, result|  end  # → on :stopped
callback :complete                    do |result|        end  # → on :complete

Per-step:

after  :step, :translate              do |result| end
before :step, :translate              do |payload| end


46
47
48
# File 'lib/active_harness/pipeline/hooks.rb', line 46

def before(event, step_name = nil, &block)
  on(:"before_#{event}", step_name, &block)
end

.callback(event, &block) ⇒ Object



54
55
56
# File 'lib/active_harness/pipeline/hooks.rb', line 54

def callback(event, &block)
  on(event, &block)
end

.inherited(subclass) ⇒ Object

Each subclass gets its own isolated config.



58
59
60
61
62
63
# File 'lib/active_harness/pipeline.rb', line 58

def inherited(subclass)
  subclass.instance_variable_set(
    :@pipeline_config,
    { steps: [], hooks: {}, step_hooks: {}, streams: {} }
  )
end

.on(event, step_name = nil, &block) ⇒ Object

Register a global or per-step hook.

Global hooks fire on every step:

on :before_step do |step_name, payload| ... end
on :after_step  do |step_name, result|  ... end
on :stopped     do |step_name, result|  ... end
on :complete    do |last_result|         ... end

Per-step hooks fire only for the named step (no step_name passed):

on :before_step, :translate do |payload| ... end
on :after_step,  :translate do |result|  ... end


18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/active_harness/pipeline/hooks.rb', line 18

def on(event, step_name = nil, &block)
  if step_name
    unless VALID_STEP_HOOKS.include?(event)
      raise ArgumentError,
        "Per-step hooks support: #{VALID_STEP_HOOKS.join(", ")}. Got :#{event}"
    end
    pipeline_config[:step_hooks][step_name] ||= {}
    (pipeline_config[:step_hooks][step_name][event] ||= []) << block
  else
    unless VALID_HOOKS.include?(event)
      raise ArgumentError,
        "Unknown Pipeline hook :#{event}. Valid: #{VALID_HOOKS.join(", ")}"
    end
    (pipeline_config[:hooks][event] ||= []) << block
  end
end

.on_agent_event(&block) ⇒ Object

Class-level event stream handlers — fired for every matching event from any agent or tribunal executed within this pipeline (including agents running inside tribunals). Multiple blocks can be registered; all fire.

The handler receives the same (event, *args) signature that the runtime streams: { agent: lambda } would receive.

on_agent_event do |event, result|
  Rails.logger.info "[Agent #{event}] #{result.model}" if event == :after_call
end

on_tribunal_event do |event, verdict|
  Rails.logger.info "[Tribunal #{event}] verdict=#{verdict}" if event == :after_verdict
end

on_pipeline_event do |event, step_name, _data|
  Rails.logger.info "[Pipeline #{event}] step=#{step_name}"
end


83
84
85
# File 'lib/active_harness/pipeline.rb', line 83

def on_agent_event(&block)
  (pipeline_config[:streams][:agent] ||= []) << block
end

.on_pipeline_event(&block) ⇒ Object



91
92
93
# File 'lib/active_harness/pipeline.rb', line 91

def on_pipeline_event(&block)
  (pipeline_config[:streams][:pipeline] ||= []) << block
end

.on_tribunal_event(&block) ⇒ Object



87
88
89
# File 'lib/active_harness/pipeline.rb', line 87

def on_tribunal_event(&block)
  (pipeline_config[:streams][:tribunal] ||= []) << block
end

.pipeline_configObject



53
54
55
# File 'lib/active_harness/pipeline.rb', line 53

def pipeline_config
  @pipeline_config ||= { steps: [], hooks: {}, step_hooks: {}, streams: {} }
end

.step(name, agent_class = nil, &block) ⇒ Object

Define a step in the pipeline.

Shorthand (agent only, no stop_if):

step :translate, TranslationAgent

Full block form:

step :injection_guard do
  use InjectionGuardAgent
  stop_if ->(result) { result.parsed["detected"] == true }
end


49
50
51
# File 'lib/active_harness/pipeline.rb', line 49

def step(name, agent_class = nil, &block)
  pipeline_config[:steps] << Pipeline::Step.new(name, agent_class, &block)
end

Instance Method Details

#callObject

Execute all steps sequentially. Returns self for chaining.



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/active_harness/pipeline.rb', line 144

def call
  config = self.class.pipeline_config
  t0     = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  @memory&.load

  config[:steps].each do |step|
    fire(:before_step, step.name, @payload, config)
    fire_step(:before_step, step.name, @payload, config)

    result = execute_step(step)

    @step_results[step.name] = result
    @context[step.name]      = result
    @payload                 = result.output if step.transform?

    fire(:after_step, step.name, result, config)
    fire_step(:after_step, step.name, result, config)

    if step.stop_if && step.stop_if.call(result)
      @stopped     = true
      @stopped_at  = step.name
      @stop_reason = result
      run_hooks(config[:hooks], :stopped, step.name, result)
      @pipeline_event_stream&.call(:stopped, step.name, result)
      break
    end
  end

  @execution_time = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0).round(3)
  @output         = @payload unless @stopped

  unless @stopped
    @memory&.record(
      request:  @original_input,
      response: @output,
      pipeline: self.class.name
    )

    last_result = @step_results[@step_results.keys.last]
    run_hooks(config[:hooks], :complete, last_result)
    @pipeline_event_stream&.call(:complete, last_result)
  end

  self
end

#input=(value) ⇒ Object



109
110
111
112
# File 'lib/active_harness/pipeline.rb', line 109

def input=(value)
  @original_input = value
  @payload        = value
end

#stopped?Boolean

Returns:

  • (Boolean)


139
140
141
# File 'lib/active_harness/pipeline.rb', line 139

def stopped?
  @stopped
end