Class: ActiveHarness::Pipeline
- Inherits:
-
Object
- Object
- ActiveHarness::Pipeline
- Includes:
- Core::HookRunner
- Defined in:
- lib/active_harness/pipeline.rb,
lib/active_harness/pipeline/step.rb,
lib/active_harness/pipeline/hooks.rb
Overview
Sequential pipeline that chains agents and tribunals. Each step receives the current payload and can transform it or stop the pipeline.
Usage (subclass with DSL):
class SupportPipeline < ActiveHarness::Pipeline
step :injection_guard do
use InjectionGuardAgent
stop_if ->(result) { result.parsed["detected"] == true }
end
step :translate, TranslationAgent # shorthand — no stop_if
step :safety_tribunal do
use SafetyTribunal
stop_if ->(result) { result.verdict == false }
end
on :before_step do |step_name, payload| ... end
on :after_step do |step_name, result| ... end
on :before_step, :translate do |payload| ... end
on :after_step, :translate do |result| ... end
on :stopped do |step_name, result| ... end
on :complete do |last_result| ... end
end
pipeline = SupportPipeline.new(input: "...", context: { user_id: 1 })
pipeline.call
pipeline.output # => final payload string (nil if stopped)
pipeline.stopped? # => false
pipeline.step_results # => { translate: <Result>, ... }
Direct Known Subclasses
Defined Under Namespace
Classes: Step
Constant Summary collapse
- VALID_HOOKS =
%i[before_step after_step stopped complete].freeze
- VALID_STEP_HOOKS =
%i[before_step after_step].freeze
Instance Attribute Summary collapse
-
#context ⇒ Object
————————————————————————- Instance API ————————————————————————-.
-
#execution_time ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
-
#original_input ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
-
#output ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
-
#params ⇒ Object
Returns the value of attribute params.
-
#step_results ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
-
#stop_reason ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
-
#stopped_at ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
Class Method Summary collapse
- .after(event, step_name = nil, &block) ⇒ Object
-
.before(event, step_name = nil, &block) ⇒ Object
Rails-style aliases for
on:. - .callback(event, &block) ⇒ Object
-
.inherited(subclass) ⇒ Object
Each subclass gets its own isolated config.
-
.on(event, step_name = nil, &block) ⇒ Object
Register a global or per-step hook.
-
.on_agent_event(&block) ⇒ Object
Class-level event stream handlers — fired for every matching event from any agent or tribunal executed within this pipeline (including agents running inside tribunals).
- .on_pipeline_event(&block) ⇒ Object
- .on_tribunal_event(&block) ⇒ Object
- .pipeline_config ⇒ Object
-
.step(name, agent_class = nil, &block) ⇒ Object
Define a step in the pipeline.
Instance Method Summary collapse
-
#call ⇒ Object
Execute all steps sequentially.
-
#initialize(input:, context: {}, params: {}, memory: nil, streams: {}) ⇒ Pipeline
constructor
A new instance of Pipeline.
- #input=(value) ⇒ Object
- #stopped? ⇒ Boolean
Constructor Details
#initialize(input:, context: {}, params: {}, memory: nil, streams: {}) ⇒ Pipeline
Returns a new instance of Pipeline.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/active_harness/pipeline.rb', line 114 def initialize( input:, context: {}, params: {}, memory: nil, streams: {} ) @original_input = input @payload = input @context = context.dup @params = params @memory = memory @token_stream = streams[:token] class_streams = self.class.pipeline_config[:streams] || {} @agent_event_stream = merge_stream(streams[:agent], class_streams[:agent]) @tribunal_event_stream = merge_stream(streams[:tribunal], class_streams[:tribunal]) @pipeline_event_stream = merge_stream(streams[:pipeline], class_streams[:pipeline]) @step_results = {} @stopped = false @stopped_at = nil @stop_reason = nil @execution_time = nil @output = nil end |
Instance Attribute Details
#context ⇒ Object
Instance API
99 100 101 |
# File 'lib/active_harness/pipeline.rb', line 99 def context @context end |
#execution_time ⇒ Object (readonly)
Instance API
99 100 101 |
# File 'lib/active_harness/pipeline.rb', line 99 def execution_time @execution_time end |
#original_input ⇒ Object (readonly)
Instance API
99 100 101 |
# File 'lib/active_harness/pipeline.rb', line 99 def original_input @original_input end |
#output ⇒ Object (readonly)
Instance API
99 100 101 |
# File 'lib/active_harness/pipeline.rb', line 99 def output @output end |
#params ⇒ Object
Returns the value of attribute params.
107 108 109 |
# File 'lib/active_harness/pipeline.rb', line 107 def params @params end |
#step_results ⇒ Object (readonly)
Instance API
99 100 101 |
# File 'lib/active_harness/pipeline.rb', line 99 def step_results @step_results end |
#stop_reason ⇒ Object (readonly)
Instance API
99 100 101 |
# File 'lib/active_harness/pipeline.rb', line 99 def stop_reason @stop_reason end |
#stopped_at ⇒ Object (readonly)
Instance API
99 100 101 |
# File 'lib/active_harness/pipeline.rb', line 99 def stopped_at @stopped_at end |
Class Method Details
.after(event, step_name = nil, &block) ⇒ Object
50 51 52 |
# File 'lib/active_harness/pipeline/hooks.rb', line 50 def after(event, step_name = nil, &block) on(:"after_#{event}", step_name, &block) end |
.before(event, step_name = nil, &block) ⇒ Object
Rails-style aliases for on:
Global:
before :step do |name, payload| end # → on :before_step
after :step do |name, result| end # → on :after_step
callback :stopped do |name, result| end # → on :stopped
callback :complete do |result| end # → on :complete
Per-step:
after :step, :translate do |result| end
before :step, :translate do |payload| end
46 47 48 |
# File 'lib/active_harness/pipeline/hooks.rb', line 46 def before(event, step_name = nil, &block) on(:"before_#{event}", step_name, &block) end |
.callback(event, &block) ⇒ Object
54 55 56 |
# File 'lib/active_harness/pipeline/hooks.rb', line 54 def callback(event, &block) on(event, &block) end |
.inherited(subclass) ⇒ Object
Each subclass gets its own isolated config.
58 59 60 61 62 63 |
# File 'lib/active_harness/pipeline.rb', line 58 def inherited(subclass) subclass.instance_variable_set( :@pipeline_config, { steps: [], hooks: {}, step_hooks: {}, streams: {} } ) end |
.on(event, step_name = nil, &block) ⇒ Object
Register a global or per-step hook.
Global hooks fire on every step:
on :before_step do |step_name, payload| ... end
on :after_step do |step_name, result| ... end
on :stopped do |step_name, result| ... end
on :complete do |last_result| ... end
Per-step hooks fire only for the named step (no step_name passed):
on :before_step, :translate do |payload| ... end
on :after_step, :translate do |result| ... end
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/active_harness/pipeline/hooks.rb', line 18 def on(event, step_name = nil, &block) if step_name unless VALID_STEP_HOOKS.include?(event) raise ArgumentError, "Per-step hooks support: #{VALID_STEP_HOOKS.join(", ")}. Got :#{event}" end pipeline_config[:step_hooks][step_name] ||= {} (pipeline_config[:step_hooks][step_name][event] ||= []) << block else unless VALID_HOOKS.include?(event) raise ArgumentError, "Unknown Pipeline hook :#{event}. Valid: #{VALID_HOOKS.join(", ")}" end (pipeline_config[:hooks][event] ||= []) << block end end |
.on_agent_event(&block) ⇒ Object
Class-level event stream handlers — fired for every matching event from any agent or tribunal executed within this pipeline (including agents running inside tribunals). Multiple blocks can be registered; all fire.
The handler receives the same (event, *args) signature that the runtime streams: { agent: lambda } would receive.
on_agent_event do |event, result|
Rails.logger.info "[Agent #{event}] #{result.model}" if event == :after_call
end
on_tribunal_event do |event, verdict|
Rails.logger.info "[Tribunal #{event}] verdict=#{verdict}" if event == :after_verdict
end
on_pipeline_event do |event, step_name, _data|
Rails.logger.info "[Pipeline #{event}] step=#{step_name}"
end
83 84 85 |
# File 'lib/active_harness/pipeline.rb', line 83 def on_agent_event(&block) (pipeline_config[:streams][:agent] ||= []) << block end |
.on_pipeline_event(&block) ⇒ Object
91 92 93 |
# File 'lib/active_harness/pipeline.rb', line 91 def on_pipeline_event(&block) (pipeline_config[:streams][:pipeline] ||= []) << block end |
.on_tribunal_event(&block) ⇒ Object
87 88 89 |
# File 'lib/active_harness/pipeline.rb', line 87 def on_tribunal_event(&block) (pipeline_config[:streams][:tribunal] ||= []) << block end |
.pipeline_config ⇒ Object
53 54 55 |
# File 'lib/active_harness/pipeline.rb', line 53 def pipeline_config @pipeline_config ||= { steps: [], hooks: {}, step_hooks: {}, streams: {} } end |
.step(name, agent_class = nil, &block) ⇒ Object
Define a step in the pipeline.
Shorthand (agent only, no stop_if):
step :translate, TranslationAgent
Full block form:
step :injection_guard do
use InjectionGuardAgent
stop_if ->(result) { result.parsed["detected"] == true }
end
49 50 51 |
# File 'lib/active_harness/pipeline.rb', line 49 def step(name, agent_class = nil, &block) pipeline_config[:steps] << Pipeline::Step.new(name, agent_class, &block) end |
Instance Method Details
#call ⇒ Object
Execute all steps sequentially. Returns self for chaining.
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/active_harness/pipeline.rb', line 144 def call config = self.class.pipeline_config t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) @memory&.load config[:steps].each do |step| fire(:before_step, step.name, @payload, config) fire_step(:before_step, step.name, @payload, config) result = execute_step(step) @step_results[step.name] = result @context[step.name] = result @payload = result.output if step.transform? fire(:after_step, step.name, result, config) fire_step(:after_step, step.name, result, config) if step.stop_if && step.stop_if.call(result) @stopped = true @stopped_at = step.name @stop_reason = result run_hooks(config[:hooks], :stopped, step.name, result) @pipeline_event_stream&.call(:stopped, step.name, result) break end end @execution_time = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0).round(3) @output = @payload unless @stopped unless @stopped @memory&.record( request: @original_input, response: @output, pipeline: self.class.name ) last_result = @step_results[@step_results.keys.last] run_hooks(config[:hooks], :complete, last_result) @pipeline_event_stream&.call(:complete, last_result) end self end |
#input=(value) ⇒ Object
109 110 111 112 |
# File 'lib/active_harness/pipeline.rb', line 109 def input=(value) @original_input = value @payload = value end |
#stopped? ⇒ Boolean
139 140 141 |
# File 'lib/active_harness/pipeline.rb', line 139 def stopped? @stopped end |