Class: ActiveHarness::Pipeline
- Inherits:
-
Object
- Object
- ActiveHarness::Pipeline
- Defined in:
- lib/active_harness/pipeline.rb,
lib/active_harness/pipeline/step.rb
Overview
Sequential pipeline that chains agents and tribunals. Each step receives the current payload and can transform it or stop the pipeline.
Usage (subclass with DSL):
class SupportPipeline < ActiveHarness::Pipeline
step :injection_guard do
use InjectionGuardAgent
stop_if ->(result) { result.parsed["detected"] == true }
end
step :translate, TranslationAgent # shorthand — no stop_if
step :safety_tribunal do
use SafetyTribunal
stop_if ->(result) { result.verdict == false }
end
on :before_step do |step_name, payload| ... end
on :after_step do |step_name, result| ... end
on :before_step, :translate do |payload| ... end
on :after_step, :translate do |result| ... end
on :stopped do |step_name, result| ... end
on :complete do |last_result| ... end
end
pipeline = SupportPipeline.new(input: "...", context: { user_id: 1 })
pipeline.call
pipeline.output # => final payload string (nil if stopped)
pipeline.stopped? # => false
pipeline.step_results # => { translate: <Result>, ... }
Direct Known Subclasses
Defined Under Namespace
Classes: Step
Constant Summary collapse
- VALID_HOOKS =
%i[before_step after_step stopped complete].freeze
- VALID_STEP_HOOKS =
%i[before_step after_step].freeze
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
-
#execution_time ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
-
#original_input ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
-
#output ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
-
#step_results ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
-
#stop_reason ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
-
#stopped_at ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
Class Method Summary collapse
- .after(event, step_name = nil, &block) ⇒ Object
-
.before(event, step_name = nil, &block) ⇒ Object
Rails-style aliases for
on:. - .callback(event, &block) ⇒ Object
-
.inherited(subclass) ⇒ Object
Each subclass gets its own isolated config.
-
.on(event, step_name = nil, &block) ⇒ Object
Register a global or per-step hook.
- .pipeline_config ⇒ Object
-
.step(name, agent_class = nil, &block) ⇒ Object
Define a step in the pipeline.
Instance Method Summary collapse
-
#call ⇒ Object
Execute all steps sequentially.
-
#initialize(input:, context: {}, memory: nil) ⇒ Pipeline
constructor
A new instance of Pipeline.
- #stopped? ⇒ Boolean
Constructor Details
#initialize(input:, context: {}, memory: nil) ⇒ Pipeline
Returns a new instance of Pipeline.
126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/active_harness/pipeline.rb', line 126 def initialize(input:, context: {}, memory: nil) @original_input = input @payload = input @context = context.dup @memory = memory @step_results = {} @stopped = false @stopped_at = nil @stop_reason = nil @execution_time = nil @output = nil end |
Instance Attribute Details
#context ⇒ Object (readonly)
Instance API
123 124 125 |
# File 'lib/active_harness/pipeline.rb', line 123 def context @context end |
#execution_time ⇒ Object (readonly)
Instance API
123 124 125 |
# File 'lib/active_harness/pipeline.rb', line 123 def execution_time @execution_time end |
#original_input ⇒ Object (readonly)
Instance API
123 124 125 |
# File 'lib/active_harness/pipeline.rb', line 123 def original_input @original_input end |
#output ⇒ Object (readonly)
Instance API
123 124 125 |
# File 'lib/active_harness/pipeline.rb', line 123 def output @output end |
#step_results ⇒ Object (readonly)
Instance API
123 124 125 |
# File 'lib/active_harness/pipeline.rb', line 123 def step_results @step_results end |
#stop_reason ⇒ Object (readonly)
Instance API
123 124 125 |
# File 'lib/active_harness/pipeline.rb', line 123 def stop_reason @stop_reason end |
#stopped_at ⇒ Object (readonly)
Instance API
123 124 125 |
# File 'lib/active_harness/pipeline.rb', line 123 def stopped_at @stopped_at end |
Class Method Details
.after(event, step_name = nil, &block) ⇒ Object
99 100 101 |
# File 'lib/active_harness/pipeline.rb', line 99 def after(event, step_name = nil, &block) on(:"after_#{event}", step_name, &block) end |
.before(event, step_name = nil, &block) ⇒ Object
Rails-style aliases for on:
Global:
before :step do |name, payload| end # → on :before_step
after :step do |name, result| end # → on :after_step
callback :stopped do |name, result| end # → on :stopped
callback :complete do |result| end # → on :complete
Per-step:
after :step, :translate do |result| end
before :step, :translate do |payload| end
95 96 97 |
# File 'lib/active_harness/pipeline.rb', line 95 def before(event, step_name = nil, &block) on(:"before_#{event}", step_name, &block) end |
.callback(event, &block) ⇒ Object
103 104 105 |
# File 'lib/active_harness/pipeline.rb', line 103 def callback(event, &block) on(event, &block) end |
.inherited(subclass) ⇒ Object
Each subclass gets its own isolated config.
112 113 114 115 116 117 |
# File 'lib/active_harness/pipeline.rb', line 112 def inherited(subclass) subclass.instance_variable_set( :@pipeline_config, { steps: [], hooks: {}, step_hooks: {} } ) end |
.on(event, step_name = nil, &block) ⇒ Object
Register a global or per-step hook.
Global hooks fire on every step:
on :before_step do |step_name, payload| ... end
on :after_step do |step_name, result| ... end
on :stopped do |step_name, result| ... end
on :complete do |last_result| ... end
Per-step hooks fire only for the named step (no step_name passed):
on :before_step, :translate do |payload| ... end
on :after_step, :translate do |result| ... end
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/active_harness/pipeline.rb', line 67 def on(event, step_name = nil, &block) if step_name unless VALID_STEP_HOOKS.include?(event) raise ArgumentError, "Per-step hooks support: #{VALID_STEP_HOOKS.join(", ")}. Got :#{event}" end pipeline_config[:step_hooks][step_name] ||= {} pipeline_config[:step_hooks][step_name][event] = block else unless VALID_HOOKS.include?(event) raise ArgumentError, "Unknown Pipeline hook :#{event}. Valid: #{VALID_HOOKS.join(", ")}" end pipeline_config[:hooks][event] = block end end |
.pipeline_config ⇒ Object
107 108 109 |
# File 'lib/active_harness/pipeline.rb', line 107 def pipeline_config @pipeline_config ||= { steps: [], hooks: {}, step_hooks: {} } end |
.step(name, agent_class = nil, &block) ⇒ Object
Define a step in the pipeline.
Shorthand (agent only, no stop_if):
step :translate, TranslationAgent
Full block form:
step :injection_guard do
use InjectionGuardAgent
stop_if ->(result) { result.parsed["detected"] == true }
end
52 53 54 |
# File 'lib/active_harness/pipeline.rb', line 52 def step(name, agent_class = nil, &block) pipeline_config[:steps] << Pipeline::Step.new(name, agent_class, &block) end |
Instance Method Details
#call ⇒ Object
Execute all steps sequentially. Returns self for chaining.
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/active_harness/pipeline.rb', line 144 def call config = self.class.pipeline_config t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) @memory&.load config[:steps].each do |step| fire_global(:before_step, step.name, @payload, config) fire_step(:before_step, step.name, @payload, config) result = execute_step(step) @step_results[step.name] = result @context[step.name] = result @payload = result.output if step.transform? fire_global(:after_step, step.name, result, config) fire_step(:after_step, step.name, result, config) if step.stop_if && step.stop_if.call(result) @stopped = true @stopped_at = step.name @stop_reason = result config[:hooks][:stopped]&.call(step.name, result) break end end @execution_time = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0).round(3) @output = @payload unless @stopped unless @stopped @memory&.record( request: @original_input, response: @output, pipeline: self.class.name ) last_result = @step_results[@step_results.keys.last] config[:hooks][:complete]&.call(last_result) end self end |
#stopped? ⇒ Boolean
139 140 141 |
# File 'lib/active_harness/pipeline.rb', line 139 def stopped? @stopped end |