Class: Legate::Agents::ParallelAgent
- Inherits:
-
Legate::Agent
- Object
- Legate::Agent
- Legate::Agents::ParallelAgent
- Defined in:
- lib/legate/agents/parallel_agent.rb
Overview
ParallelAgent executes a set of sub-agents concurrently. All sub-agents are started simultaneously and the agent waits for all to complete.
Constant Summary collapse
- DEFAULT_PARALLEL_TIMEOUT =
120
Constants inherited from Legate::Agent
Instance Attribute Summary
Attributes inherited from Legate::Agent
#after_agent_callback, #after_model_callback, #after_tool_callback, #auth_credential_assignments, #auth_credential_names, #auth_scheme_assignments, #auth_url_mappings, #before_agent_callback, #before_model_callback, #before_tool_callback, #definition, #description, #fallback_mode, #instruction, #logger, #model_name, #name, #parent_agent, #planner, #session_service, #state, #sub_agents, #tool_registry
Instance Method Summary collapse
-
#run_task(session_id:, user_input:, session_service:) ⇒ Legate::Event
Override run_task to execute sub-agents in parallel.
Methods inherited from Legate::Agent
#add_tool, #apply_pending_state, #ask, #available_tools_metadata, define, #find_agent, #find_sub_agent, #find_tool, #find_tool_class, #initialize, #record_error_event, #register_tool_class, #root_agent, #running?, #start, #stop, #tools, #transfer_to
Constructor Details
This class inherits a constructor from Legate::Agent
Instance Method Details
#run_task(session_id:, user_input:, session_service:) ⇒ Legate::Event
Override run_task to execute sub-agents in parallel
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/legate/agents/parallel_agent.rb', line 19 def run_task(session_id:, user_input:, session_service:) # Verify we have parallel sub-agents defined unless @definition.parallel_sub_agent_names&.any? err_msg = "ParallelAgent '#{name}' has no parallel_sub_agent_names defined." Legate.logger.error(err_msg) return Legate::Event.new(role: :agent, content: { status: :error, error_message: err_msg, error_class: 'ConfigurationError' }) end # --- Pre-execution Checks --- # unless running? err_msg = "Agent '#{name}' is not running. Call agent.start before run_task, " \ 'or use agent.ask (which starts automatically).' Legate.logger.error(err_msg) return Legate::Event.new(role: :agent, content: { status: :error, error_message: err_msg }) end session = session_service.get_session(session_id: session_id) unless session err_msg = "Session not found: #{session_id}" Legate.logger.error(err_msg) return Legate::Event.new(role: :agent, content: { status: :error, error_message: err_msg }) end # --------------------------- # # Log user input to the ParallelAgent itself user_event = Legate::Event.new(role: :user, content: user_input) session_service.append_event(session_id: session_id, event: user_event) # Log the execution start Legate.logger.info("ParallelAgent '#{name}' starting parallel execution of #{@definition.parallel_sub_agent_names.size} sub-agents.") # Get the sub-agents to run in parallel sub_agents_to_run = [] missing_agents = [] @definition.parallel_sub_agent_names.each do |sub_agent_name| sub_agent = find_sub_agent(sub_agent_name) if sub_agent # Start the sub-agent if it's not already running sub_agent.start unless sub_agent.running? sub_agents_to_run << { name: sub_agent_name, agent: sub_agent } else missing_agents << sub_agent_name end end # Check if any agents are missing unless missing_agents.empty? err_msg = "The following sub-agents were not found for ParallelAgent '#{name}': #{missing_agents.join(', ')}." Legate.logger.error(err_msg) return Legate::Event.new(role: :agent, content: { status: :error, error_message: err_msg, error_class: 'MissingSubAgentError' }) end # Prepare futures for parallel execution futures = {} sub_agents_to_run.each do |agent_info| futures[agent_info[:name]] = Concurrent::Promises.future do Legate.logger.info("ParallelAgent '#{name}' executing sub-agent '#{agent_info[:name]}' in parallel.") agent_info[:agent].run_task( session_id: session_id, user_input: user_input, session_service: session_service ) rescue StandardError => e Legate.logger.error("Error executing sub-agent '#{agent_info[:name]}': #{e.class} - #{e.}\n#{e.backtrace.join("\n")}") # Return an error event Legate::Event.new(role: :agent, content: { status: :error, error_message: "Exception in sub-agent '#{agent_info[:name]}': #{e.}", error_class: e.class.name }) end end # Wait for all futures to complete all_results = {} has_errors = false timeout = @definition.respond_to?(:parallel_timeout_seconds) && @definition.parallel_timeout_seconds || DEFAULT_PARALLEL_TIMEOUT futures.each do |agent_name, future| result = future.value(timeout) all_results[agent_name] = result.content has_errors = true if result.content[:status] == :error rescue Concurrent::TimeoutError Legate.logger.error("Timeout waiting for sub-agent '#{agent_name}' to complete.") all_results[agent_name] = { status: :error, error_message: 'Timeout waiting for sub-agent to complete', error_class: 'TimeoutError' } has_errors = true rescue StandardError => e Legate.logger.error("Error processing sub-agent '#{agent_name}' result: #{e.class} - #{e.}") all_results[agent_name] = { status: :error, error_message: "Error processing result: #{e.}", error_class: e.class.name } has_errors = true end # Create the final result final_result = { status: has_errors ? :partial_success : :success, result: if has_errors 'Completed parallel execution with some errors' else "Successfully completed parallel execution of #{@definition.parallel_sub_agent_names.size} sub-agents" end, sub_results: all_results, agents_completed: all_results.keys.map(&:to_sym), all_successful: !has_errors } # Create the final event final_agent_event = Legate::Event.new(role: :agent, content: final_result) # Log the final event to the session session_service.append_event(session_id: session_id, event: final_agent_event) # --- MAS: Store result in session state if output_key is defined --- # if @definition.respond_to?(:output_key) && @definition.output_key && final_agent_event output_value = final_agent_event.content # Store the entire content hash Legate.logger.info("ParallelAgent '#{@name}' storing output to session state with key '#{@definition.output_key}' for session '#{session_id}'.") if session_service.respond_to?(:set_state) session_service.set_state(session_id: session_id, key: @definition.output_key, value: output_value) else Legate.logger.warn("ParallelAgent '#{@name}': Session service does not support :set_state. Cannot store output for key '#{@definition.output_key}'.") end end # --- End MAS State Management --- # final_agent_event end |