Class: Legate::Agents::LoopAgent

Inherits:
Legate::Agent show all
Defined in:
lib/legate/agents/loop_agent.rb

Overview

LoopAgent executes a set of sub-agents repeatedly until a condition is met or maximum iterations are reached.

Constant Summary collapse

DEFAULT_MAX_ITERATIONS =
100
DEFAULT_TIMEOUT_SECONDS =

1 hour

3600
MAX_STORED_ITERATIONS =
50

Constants inherited from Legate::Agent

Legate::Agent::DEFAULT_MODEL

Instance Attribute Summary

Attributes inherited from Legate::Agent

#after_agent_callback, #after_model_callback, #after_tool_callback, #auth_credential_assignments, #auth_credential_names, #auth_scheme_assignments, #auth_url_mappings, #before_agent_callback, #before_model_callback, #before_tool_callback, #definition, #description, #fallback_mode, #instruction, #logger, #model_name, #name, #parent_agent, #planner, #session_service, #state, #sub_agents, #tool_registry

Instance Method Summary collapse

Methods inherited from Legate::Agent

#add_tool, #apply_pending_state, #ask, #available_tools_metadata, define, #find_agent, #find_sub_agent, #find_tool, #find_tool_class, #initialize, #record_error_event, #register_tool_class, #root_agent, #running?, #start, #stop, #tools, #transfer_to

Constructor Details

This class inherits a constructor from Legate::Agent

Instance Method Details

#run_task(session_id:, user_input:, session_service:) ⇒ Legate::Event

Override run_task to execute sub-agents in a loop

Parameters:

  • session_id (String)

    The session ID

  • user_input (String)

    User input to process

  • session_service (Legate::SessionService::Base)

    Session service for persistence

Returns:



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/legate/agents/loop_agent.rb', line 19

def run_task(session_id:, user_input:, session_service:)
  # Verify we have loop sub-agents defined
  unless @definition.loop_sub_agent_names&.any?
    err_msg = "LoopAgent '#{name}' has no loop_sub_agent_names defined."
    Legate.logger.error(err_msg)
    return Legate::Event.new(role: :agent, content: {
                               status: :error,
                               error_message: err_msg,
                               error_class: 'ConfigurationError'
                             })
  end

  # Verify we have either loop_max_iterations or a condition
  unless @definition.loop_max_iterations || (@definition.loop_condition_state_key && !@definition.loop_condition_expected_value.nil?)
    err_msg = "LoopAgent '#{name}' must define either loop_max_iterations or loop_condition (state key + expected value)."
    Legate.logger.error(err_msg)
    return Legate::Event.new(role: :agent, content: {
                               status: :error,
                               error_message: err_msg,
                               error_class: 'ConfigurationError'
                             })
  end

  # --- Pre-execution Checks --- #
  unless running?
    err_msg = "Agent '#{name}' is not running. Call agent.start before run_task, " \
              'or use agent.ask (which starts automatically).'
    Legate.logger.error(err_msg)
    return Legate::Event.new(role: :agent, content: { status: :error, error_message: err_msg })
  end

  session = session_service.get_session(session_id: session_id)
  unless session
    err_msg = "Session not found: #{session_id}"
    Legate.logger.error(err_msg)
    return Legate::Event.new(role: :agent, content: { status: :error, error_message: err_msg })
  end
  # --------------------------- #

  # Log user input to the LoopAgent itself
  user_event = Legate::Event.new(role: :user, content: user_input)
  session_service.append_event(session_id: session_id, event: user_event)

  # Determine loop parameters
  max_iterations = @definition.loop_max_iterations || DEFAULT_MAX_ITERATIONS
  timeout_seconds = @definition.respond_to?(:loop_timeout_seconds) && @definition.loop_timeout_seconds || DEFAULT_TIMEOUT_SECONDS
  condition_key = @definition.loop_condition_state_key
  expected_value = @definition.loop_condition_expected_value

  Legate.logger.info("LoopAgent '#{name}' starting execution with max #{max_iterations} iterations, #{timeout_seconds}s timeout" +
                  (condition_key ? " or until #{condition_key} equals #{expected_value.inspect}" : ''))

  # Track loop iterations and results
  iteration = 0
  all_iterations = []
  final_result = nil
  loop_condition_met = false
  start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  # Execute the loop
  while iteration < max_iterations
    elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
    if elapsed >= timeout_seconds
      Legate.logger.warn("LoopAgent '#{name}' timed out after #{elapsed.round(1)}s (limit: #{timeout_seconds}s)")
      final_result = {
        status: :error,
        error_message: "Loop timed out after #{elapsed.round(1)} seconds (#{iteration} iterations completed)",
        error_class: 'TimeoutError',
        iterations_completed: iteration,
        max_iterations: max_iterations,
        loop_condition_met: false,
        iterations: all_iterations
      }
      break
    end
    iteration += 1
    Legate.logger.info("LoopAgent '#{name}' starting iteration #{iteration}/#{max_iterations == Float::INFINITY ? '' : max_iterations}")

    # Check condition (if defined) before executing the iteration
    if condition_key && session_service.respond_to?(:get_state)
      current_value = session_service.get_state(session_id: session_id, key: condition_key)
      if current_value == expected_value
        Legate.logger.info("LoopAgent '#{name}' condition met: #{condition_key} = #{expected_value.inspect}. Exiting loop.")
        loop_condition_met = true
        break
      end
    end

    # Execute one iteration (all sub-agents in sequence)
    iteration_results = []
    iteration_error = nil

    # Execute each sub-agent in order (sequential execution within each iteration)
    @definition.loop_sub_agent_names.each_with_index do |sub_agent_name, index|
      sub_agent = find_sub_agent(sub_agent_name)
      unless sub_agent
        err_msg = "Sub-agent '#{sub_agent_name}' not found for LoopAgent '#{name}'."
        Legate.logger.error(err_msg)
        iteration_error = {
          status: :error,
          error_message: err_msg,
          error_class: 'MissingSubAgentError',
          step: index + 1,
          total_steps: @definition.loop_sub_agent_names.size,
          previous_results: iteration_results.map.with_index { |r, i| { agent: @definition.loop_sub_agent_names.to_a[i], result: r } }
        }
        break # Stop this iteration's execution
      end

      # Start the sub-agent if it's not already running
      sub_agent.start unless sub_agent.running?

      # Execute the sub-agent with the same session and input
      begin
        Legate.logger.info("LoopAgent '#{name}' executing sub-agent '#{sub_agent_name}' (iteration #{iteration}, step #{index + 1}/#{@definition.loop_sub_agent_names.size}).")
        sub_result = sub_agent.run_task(
          session_id: session_id,
          user_input: user_input,
          session_service: session_service
        )

        # Record the result
        iteration_results << { agent: sub_agent_name, result: sub_result.content }

        # Check for error to break sequence
        if sub_result.content[:status] == :error
          Legate.logger.warn("Sub-agent '#{sub_agent_name}' returned error, breaking iteration: #{sub_result.content[:error_message]}")
          iteration_error = {
            status: :error,
            error_message: "Error in sub-agent '#{sub_agent_name}': #{sub_result.content[:error_message]}",
            error_class: sub_result.content[:error_class] || 'SubAgentError',
            step: index + 1,
            total_steps: @definition.loop_sub_agent_names.size,
            sub_agent: sub_agent_name.to_s,
            sub_result: sub_result.content
          }
          break # Stop this iteration's execution on error
        end
      rescue StandardError => e
        Legate.logger.error("Error executing sub-agent '#{sub_agent_name}': #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}")
        iteration_error = {
          status: :error,
          error_message: "Exception in sub-agent '#{sub_agent_name}': #{e.message}",
          error_class: e.class.name,
          step: index + 1,
          total_steps: @definition.loop_sub_agent_names.size,
          sub_agent: sub_agent_name.to_s
        }
        break # Stop this iteration's execution on error
      end
    end

    # Record this iteration's results, capping stored history
    all_iterations << {
      iteration: iteration,
      results: iteration_results,
      error: iteration_error
    }
    all_iterations.shift if all_iterations.size > MAX_STORED_ITERATIONS

    # If there was an error in this iteration, we may need to break the loop
    if iteration_error
      # An error occurred during execution - decide whether to break the loop
      Legate.logger.warn("LoopAgent '#{name}' iteration #{iteration} encountered an error. Exiting loop.")
      final_result = {
        status: :error,
        error_message: "Loop terminated due to error in iteration #{iteration}: #{iteration_error[:error_message]}",
        error_class: iteration_error[:error_class],
        iterations_completed: iteration,
        max_iterations: max_iterations,
        loop_condition_met: false,
        iterations: all_iterations
      }
      break # Exit the loop on error
    end

    # Check condition (if defined) after executing the iteration
    next unless condition_key && session_service.respond_to?(:get_state)

    current_value = session_service.get_state(session_id: session_id, key: condition_key)
    next unless current_value == expected_value

    Legate.logger.info("LoopAgent '#{name}' condition met: #{condition_key} = #{expected_value.inspect}. Exiting loop.")
    loop_condition_met = true
    break
  end

  # If we didn't set a final_result due to an error, create a success result
  if final_result.nil?
    completion_reason = if loop_condition_met
                          "condition met (#{condition_key} = #{expected_value.inspect})"
                        elsif iteration >= max_iterations
                          "maximum iterations (#{max_iterations}) reached"
                        else
                          'unknown reason'
                        end

    final_result = {
      status: :success,
      result: "Completed #{iteration} iteration(s) of #{@definition.loop_sub_agent_names.size} sub-agent(s) - #{completion_reason}",
      iterations_completed: iteration,
      max_iterations: max_iterations,
      loop_condition_met: loop_condition_met,
      iterations: all_iterations
    }
  end

  # Create the final event
  final_agent_event = Legate::Event.new(role: :agent, content: final_result)

  # Log the final event to the session
  session_service.append_event(session_id: session_id, event: final_agent_event)

  # --- MAS: Store result in session state if output_key is defined --- #
  if @definition.respond_to?(:output_key) && @definition.output_key && final_agent_event
    output_value = final_agent_event.content # Store the entire content hash
    Legate.logger.info("LoopAgent '#{@name}' storing output to session state with key '#{@definition.output_key}' for session '#{session_id}'.")
    if session_service.respond_to?(:set_state)
      session_service.set_state(session_id: session_id, key: @definition.output_key, value: output_value)
    else
      Legate.logger.warn("LoopAgent '#{@name}': Session service does not support :set_state. Cannot store output for key '#{@definition.output_key}'.")
    end
  end
  # --- End MAS State Management --- #

  final_agent_event
end