Class: ActiveHarness::Pipeline
- Inherits:
-
Object
- Object
- ActiveHarness::Pipeline
- Defined in:
- lib/active_harness/pipeline.rb,
lib/active_harness/pipeline/step.rb,
lib/active_harness/pipeline/hooks.rb
Overview
Sequential pipeline that chains agents and tribunals. Each step receives the current payload and can transform it or stop the pipeline.
Usage (subclass with DSL):
class SupportPipeline < ActiveHarness::Pipeline
step :injection_guard do
use InjectionGuardAgent
stop_if ->(result) { result.parsed["detected"] == true }
end
step :translate, TranslationAgent # shorthand — no stop_if
step :safety_tribunal do
use SafetyTribunal
stop_if ->(result) { result.verdict == false }
end
on :before_step do |step_name, payload| ... end
on :after_step do |step_name, result| ... end
on :before_step, :translate do |payload| ... end
on :after_step, :translate do |result| ... end
on :stopped do |step_name, result| ... end
on :complete do |last_result| ... end
end
pipeline = SupportPipeline.new(input: "...", context: { user_id: 1 })
pipeline.call
pipeline.output # => final payload string (nil if stopped)
pipeline.stopped? # => false
pipeline.step_results # => { translate: <Result>, ... }
Direct Known Subclasses
Defined Under Namespace
Classes: Step
Constant Summary collapse
- VALID_HOOKS =
%i[before_step after_step stopped complete].freeze
- VALID_STEP_HOOKS =
%i[before_step after_step].freeze
Instance Attribute Summary collapse
-
#context ⇒ Object
————————————————————————- Instance API ————————————————————————-.
-
#execution_time ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
-
#original_input ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
-
#output ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
-
#step_results ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
-
#stop_reason ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
-
#stopped_at ⇒ Object
readonly
————————————————————————- Instance API ————————————————————————-.
Class Method Summary collapse
- .after(event, step_name = nil, &block) ⇒ Object
-
.before(event, step_name = nil, &block) ⇒ Object
Rails-style aliases for
on:. - .callback(event, &block) ⇒ Object
-
.inherited(subclass) ⇒ Object
Each subclass gets its own isolated config.
-
.on(event, step_name = nil, &block) ⇒ Object
Register a global or per-step hook.
- .pipeline_config ⇒ Object
-
.step(name, agent_class = nil, &block) ⇒ Object
Define a step in the pipeline.
Instance Method Summary collapse
-
#call ⇒ Object
Execute all steps sequentially.
-
#initialize(input:, context: {}, memory: nil, streams: {}) ⇒ Pipeline
constructor
A new instance of Pipeline.
- #input=(value) ⇒ Object
- #stopped? ⇒ Boolean
Constructor Details
#initialize(input:, context: {}, memory: nil, streams: {}) ⇒ Pipeline
Returns a new instance of Pipeline.
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/active_harness/pipeline.rb', line 78 def initialize(input:, context: {}, memory: nil, streams: {}) @original_input = input @payload = input @context = context.dup @memory = memory @token_stream = streams[:token] @agent_event_stream = streams[:agent] @tribunal_event_stream = streams[:tribunal] @pipeline_event_stream = streams[:pipeline] @step_results = {} @stopped = false @stopped_at = nil @stop_reason = nil @execution_time = nil @output = nil end |
Instance Attribute Details
#context ⇒ Object
Instance API
69 70 71 |
# File 'lib/active_harness/pipeline.rb', line 69 def context @context end |
#execution_time ⇒ Object (readonly)
Instance API
69 70 71 |
# File 'lib/active_harness/pipeline.rb', line 69 def execution_time @execution_time end |
#original_input ⇒ Object (readonly)
Instance API
69 70 71 |
# File 'lib/active_harness/pipeline.rb', line 69 def original_input @original_input end |
#output ⇒ Object (readonly)
Instance API
69 70 71 |
# File 'lib/active_harness/pipeline.rb', line 69 def output @output end |
#step_results ⇒ Object (readonly)
Instance API
69 70 71 |
# File 'lib/active_harness/pipeline.rb', line 69 def step_results @step_results end |
#stop_reason ⇒ Object (readonly)
Instance API
69 70 71 |
# File 'lib/active_harness/pipeline.rb', line 69 def stop_reason @stop_reason end |
#stopped_at ⇒ Object (readonly)
Instance API
69 70 71 |
# File 'lib/active_harness/pipeline.rb', line 69 def stopped_at @stopped_at end |
Class Method Details
.after(event, step_name = nil, &block) ⇒ Object
50 51 52 |
# File 'lib/active_harness/pipeline/hooks.rb', line 50 def after(event, step_name = nil, &block) on(:"after_#{event}", step_name, &block) end |
.before(event, step_name = nil, &block) ⇒ Object
Rails-style aliases for on:
Global:
before :step do |name, payload| end # → on :before_step
after :step do |name, result| end # → on :after_step
callback :stopped do |name, result| end # → on :stopped
callback :complete do |result| end # → on :complete
Per-step:
after :step, :translate do |result| end
before :step, :translate do |payload| end
46 47 48 |
# File 'lib/active_harness/pipeline/hooks.rb', line 46 def before(event, step_name = nil, &block) on(:"before_#{event}", step_name, &block) end |
.callback(event, &block) ⇒ Object
54 55 56 |
# File 'lib/active_harness/pipeline/hooks.rb', line 54 def callback(event, &block) on(event, &block) end |
.inherited(subclass) ⇒ Object
Each subclass gets its own isolated config.
58 59 60 61 62 63 |
# File 'lib/active_harness/pipeline.rb', line 58 def inherited(subclass) subclass.instance_variable_set( :@pipeline_config, { steps: [], hooks: {}, step_hooks: {} } ) end |
.on(event, step_name = nil, &block) ⇒ Object
Register a global or per-step hook.
Global hooks fire on every step:
on :before_step do |step_name, payload| ... end
on :after_step do |step_name, result| ... end
on :stopped do |step_name, result| ... end
on :complete do |last_result| ... end
Per-step hooks fire only for the named step (no step_name passed):
on :before_step, :translate do |payload| ... end
on :after_step, :translate do |result| ... end
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/active_harness/pipeline/hooks.rb', line 18 def on(event, step_name = nil, &block) if step_name unless VALID_STEP_HOOKS.include?(event) raise ArgumentError, "Per-step hooks support: #{VALID_STEP_HOOKS.join(", ")}. Got :#{event}" end pipeline_config[:step_hooks][step_name] ||= {} pipeline_config[:step_hooks][step_name][event] = block else unless VALID_HOOKS.include?(event) raise ArgumentError, "Unknown Pipeline hook :#{event}. Valid: #{VALID_HOOKS.join(", ")}" end pipeline_config[:hooks][event] = block end end |
.pipeline_config ⇒ Object
53 54 55 |
# File 'lib/active_harness/pipeline.rb', line 53 def pipeline_config @pipeline_config ||= { steps: [], hooks: {}, step_hooks: {} } end |
.step(name, agent_class = nil, &block) ⇒ Object
Define a step in the pipeline.
Shorthand (agent only, no stop_if):
step :translate, TranslationAgent
Full block form:
step :injection_guard do
use InjectionGuardAgent
stop_if ->(result) { result.parsed["detected"] == true }
end
49 50 51 |
# File 'lib/active_harness/pipeline.rb', line 49 def step(name, agent_class = nil, &block) pipeline_config[:steps] << Pipeline::Step.new(name, agent_class, &block) end |
Instance Method Details
#call ⇒ Object
Execute all steps sequentially. Returns self for chaining.
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/active_harness/pipeline.rb', line 100 def call config = self.class.pipeline_config t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) @memory&.load config[:steps].each do |step| fire(:before_step, step.name, @payload, config) fire_step(:before_step, step.name, @payload, config) result = execute_step(step) @step_results[step.name] = result @context[step.name] = result @payload = result.output if step.transform? fire(:after_step, step.name, result, config) fire_step(:after_step, step.name, result, config) if step.stop_if && step.stop_if.call(result) @stopped = true @stopped_at = step.name @stop_reason = result blk = config[:hooks][:stopped] instance_exec(step.name, result, &blk) if blk @pipeline_event_stream&.call(:stopped, step.name, result) break end end @execution_time = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0).round(3) @output = @payload unless @stopped unless @stopped @memory&.record( request: @original_input, response: @output, pipeline: self.class.name ) last_result = @step_results[@step_results.keys.last] blk = config[:hooks][:complete] instance_exec(last_result, &blk) if blk @pipeline_event_stream&.call(:complete, last_result) end self end |
#input=(value) ⇒ Object
73 74 75 76 |
# File 'lib/active_harness/pipeline.rb', line 73 def input=(value) @original_input = value @payload = value end |
#stopped? ⇒ Boolean
95 96 97 |
# File 'lib/active_harness/pipeline.rb', line 95 def stopped? @stopped end |