Class: Brute::Loop::Step

Inherits:
Object
  • Object
show all
Defined in:
lib/brute/loop/step.rb

Overview

A first-class work object with identity, state, result/error capture, optional sub-queue, and cancellation.

Users subclass Step and override #perform(task). The framework calls #call(task) which owns the state machine — subclasses never touch state transitions directly.

State machine:

    ┌──> completed
    │
pending ──> running ──┤
    │                 │
    │                 ├──> failed
    │                 │
    └──> cancelled    └──> cancelled

Three terminal states. Two non-terminal. Once terminal, stays terminal.

Direct Known Subclasses

AgentTurn::Base, ToolCallStep

Constant Summary collapse

STATES =
%i[pending running completed failed cancelled].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id: nil, **attributes) ⇒ Step

Returns a new instance of Step.



35
36
37
38
39
40
41
42
43
44
# File 'lib/brute/loop/step.rb', line 35

def initialize(id: nil, **attributes)
  @id         = id || self.class.generate_id
  @attributes = attributes
  @state      = :pending
  @result     = nil
  @error      = nil
  @task       = nil
  @jobs       = nil
  @mutex      = Mutex.new
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



33
34
35
# File 'lib/brute/loop/step.rb', line 33

def id
  @id
end

Class Method Details

.generate_idObject



46
47
48
# File 'lib/brute/loop/step.rb', line 46

def self.generate_id
  "#{name}-#{Process.pid}-#{Thread.current.object_id}-#{SecureRandom.hex(4)}"
end

Instance Method Details

#call(task) ⇒ Object

Called by the queue’s worker. Subclasses override #perform instead.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/brute/loop/step.rb', line 51

def call(task)
  return unless transition_to_running(task)

  begin
    result = perform(task)
    @mutex.synchronize do
      @result = result
      @state  = :completed
      @task   = nil
    end

  rescue Async::Cancel
    # Cascade to sub-queue before we lose the reference:
    @jobs&.cancel
    @mutex.synchronize do
      @state = :cancelled
      @task  = nil
    end
    raise

  rescue => error
    # Continue-on-failure: record the error, do NOT re-raise.
    @mutex.synchronize do
      @error = error
      @state = :failed
      @task  = nil
    end
  end
end

#cancelObject



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/brute/loop/step.rb', line 113

def cancel
  task = @mutex.synchronize do
    case @state
    when :pending
      @state = :cancelled
      nil
    when :running
      @task
    else
      return false # already finished
    end
  end

  task&.cancel
  true
end

#errorObject



103
104
105
# File 'lib/brute/loop/step.rb', line 103

def error
  @mutex.synchronize { @error }
end

#jobs(type: Brute::Queue::SequentialQueue) ⇒ Object

Lazy accessor — creates the sub-queue parented to our running task. Only valid while the step is running (inside #perform).



88
89
90
91
92
93
# File 'lib/brute/loop/step.rb', line 88

def jobs(type: Brute::Queue::SequentialQueue)
  @mutex.synchronize do
    raise "Step not running; sub-queue has nothing to parent to" unless @task
    @jobs ||= type.new(parent: @task).start
  end
end

#perform(task) ⇒ Object

Subclasses override this.



82
83
84
# File 'lib/brute/loop/step.rb', line 82

def perform(task)
  raise "#{self.class}#perform not implemented"
end

#resultObject



99
100
101
# File 'lib/brute/loop/step.rb', line 99

def result
  @mutex.synchronize { @result }
end

#stateObject



95
96
97
# File 'lib/brute/loop/step.rb', line 95

def state
  @mutex.synchronize { @state }
end

#statusObject



107
108
109
110
111
# File 'lib/brute/loop/step.rb', line 107

def status
  @mutex.synchronize do
    { id: @id, state: @state, result: @result, error: @error }
  end
end