Class: Legate::PlanExecutor
- Inherits:
-
Object
- Object
- Legate::PlanExecutor
- Defined in:
- lib/legate/plan_executor.rb
Overview
Executes a planner-produced plan for an Agent: iterates the steps, injects prior-step results into parameters, runs each tool (with before/after-tool callbacks and delegation interception), and logs the tool_request/tool_result events. Extracted from Legate::Agent, which keeps thin execute_plan/execute_step delegators (the lifecycle entry points exercised directly by specs).
Instance Method Summary collapse
-
#execute_plan(plan, session, session_service, invocation_id) ⇒ Object
Executes a plan and returns { details: […], last_result: <hash> }.
-
#execute_step(step, session, session_service, invocation_id = nil) ⇒ Hash
Executes a single step, logging :tool_request and :tool_result events via the session service.
-
#initialize(agent) ⇒ PlanExecutor
constructor
A new instance of PlanExecutor.
Constructor Details
#initialize(agent) ⇒ PlanExecutor
Returns a new instance of PlanExecutor.
17 18 19 |
# File 'lib/legate/plan_executor.rb', line 17 def initialize(agent) @agent = agent end |
Instance Method Details
#execute_plan(plan, session, session_service, invocation_id) ⇒ Object
Executes a plan and returns { details: […], last_result: <hash> }.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/legate/plan_executor.rb', line 22 def execute_plan(plan, session, session_service, invocation_id) session_id = session.id # A planning failure returns a direct_result (a terminal result, no steps); # surface it as-is so run_task builds a clean error Event. return { details: [], last_result: plan[:direct_result] } if plan.is_a?(Hash) && plan[:direct_result] # Extract steps based on the plan format steps = nil thought_process = nil # Handle new plan structure with thought_process and steps if plan.is_a?(Hash) && plan[:steps].is_a?(Array) steps = plan[:steps] thought_process = plan[:thought_process] Legate.logger.info("Plan thought process: #{thought_process}") if thought_process elsif plan.is_a?(Array) # For backward compatibility with old format steps = plan else msg = 'Invalid plan received from planner (not an Array or properly structured Hash).' Legate.logger.error("#{msg} Plan: #{plan.inspect}") return { details: [], last_result: { status: :error, error_message: msg } } end # --- Continue with original logic, using 'steps' variable --- unless steps.is_a?(Array) msg = 'Invalid steps structure in plan (not an Array).' Legate.logger.error("#{msg} Steps: #{steps.inspect}") return { details: [], last_result: { status: :error, error_message: msg } } end # --- Handle Empty Plan based on Fallback Mode --- if steps.empty? if @agent.fallback_mode == :echo if @agent.tool_registry.find_class(:echo) Legate.logger.warn("Plan is empty. Falling back to echo mode for session '#{session_id}'.") # Reconstruct the plan to be a single echo step # We need the original user input for this - fetch it from the session # Find the *last* user event in case of corrections/multiple turns original_user_input = session.events.reverse.find { |e| e.role == :user }&.content || '[Original input not found]' steps = [{ tool: :echo, params: { message: original_user_input } }] Legate.logger.debug("Reconstructed plan for echo fallback: #{steps.inspect}") # Now continue execution with the modified plan else # Echo tool not available, default to error mode msg = 'Planning failed and Echo fallback tool is not available to this agent.' Legate.logger.warn(msg) return { details: [], last_result: { status: :error, error_message: msg } } end else # Default or :error mode msg = 'I cannot fulfill this request with the available tools (empty plan).' Legate.logger.warn(msg) return { details: [], last_result: { status: :error, error_message: msg } } end end # --- End Handle Empty Plan --- Legate.logger.debug("Executing plan with #{steps.length} step(s) for session '#{session_id}': #{steps.inspect}") previous_step_result_hash = nil plan_execution_details = [] last_successful_or_pending_result = nil # <-- Store the original last hash steps.each_with_index do |step, index| # Log the step type for clarity step_type_desc = if step[:step_type] == :sequential_sub_agent "sequential sub-agent '#{step[:sub_agent_name]}'" else "tool '#{step[:tool]}'" end Legate.logger.debug("Executing step #{index + 1}/#{steps.length}: #{step_type_desc}") Legate.logger.debug(" Step details: #{step.inspect}") Legate.logger.debug(" Input (result hash from previous step): #{previous_step_result_hash.inspect}") # --- Input Injection Logic (Updated for job_id) --- current_params = JSON.parse(JSON.generate(step[:params]), symbolize_names: true) current_params.transform_values! do |value| injection_value = nil if value.is_a?(String) && value.match?(/\[Result from step \d+\]|\[Result from previous step\]/i) if previous_step_result_hash.is_a?(Hash) && %i[success pending].include?(previous_step_result_hash[:status]) # Prioritize :result, then :job_id (was workflow_id), then :message if previous_step_result_hash.key?(:result) prev_result = previous_step_result_hash[:result] if prev_result.is_a?(Hash) && prev_result.key?(:status) && prev_result.key?(:result) # AgentTool nested result injection_value = prev_result[:result] Legate.logger.debug('Injecting nested result...') else injection_value = prev_result Legate.logger.debug('Injecting direct result...') end elsif previous_step_result_hash.key?(:job_id) # <-- CHANGED from workflow_id injection_value = previous_step_result_hash[:job_id] Legate.logger.debug('Injecting job_id from previous step...') elsif previous_step_result_hash.key?(:message) injection_value = previous_step_result_hash[:message] Legate.logger.debug('Injecting message from previous step...') else Legate.logger.warn("Cannot inject: Previous successful/pending step missing usable key (:result, :job_id, :message). Prev Hash: #{previous_step_result_hash.inspect}") value end else Legate.logger.warn("Cannot inject: Previous step failed or absent. Prev Hash: #{previous_step_result_hash.inspect}") value end injection_value || value # Use injection if found, otherwise keep original else value # Not a placeholder string, keep original value end end step_with_injected_params = step.merge(params: current_params) Legate.logger.debug(" Params after potential injection: #{current_params.inspect}") # --- End Input Injection Logic --- # --- Execute Step --- # current_result_hash = execute_step(step_with_injected_params, session, session_service, invocation_id) # --- Sanitize for plan_details --- # sanitized_result_for_plan = {} if current_result_hash.is_a?(Hash) sanitized_result_for_plan[:status] = current_result_hash[:status] # Always include error keys, defaulting to nil if not present sanitized_result_for_plan[:error_message] = current_result_hash[:error_message] # Defaults to nil if key missing sanitized_result_for_plan[:error_class] = current_result_hash[:error_class] # Defaults to nil if key missing # Include other relevant keys if present sanitized_result_for_plan[:job_id] = current_result_hash[:job_id] if current_result_hash.key?(:job_id) sanitized_result_for_plan[:message] = current_result_hash[:message] if current_result_hash.key?(:message) # Only include :result value if it's simple result_val = current_result_hash[:result] if result_val.is_a?(String) || result_val.is_a?(Numeric) || [true, false, nil].include?(result_val) sanitized_result_for_plan[:result] = result_val elsif current_result_hash.key?(:result) # It exists but is complex sanitized_result_for_plan[:result] = '[Complex Result Structure]' end else # Should not happen based on execute_step validation, but handle defensively sanitized_result_for_plan[:status] = :error sanitized_result_for_plan[:error_message] = "Invalid format from execute_step: #{current_result_hash.inspect}" end # --- END Sanitization --- # --- Store SANITIZED step detail --- # plan_execution_details << { tool_name: step[:tool], params: current_params, result: sanitized_result_for_plan } # --- Store ORIGINAL result and check for errors --- # if current_result_hash[:status] == :error Legate.logger.warn("Step #{index + 1} failed, stopping plan execution: #{current_result_hash[:error_message]}") last_successful_or_pending_result = current_result_hash # Store the error hash as last result break # Exit the loop else # Store successful or pending hash for potential injection AND final result previous_step_result_hash = current_result_hash last_successful_or_pending_result = current_result_hash end # --- End Stop on first error / Store last result --- # end Legate.logger.debug("Plan execution finished. Structured details collected: #{plan_execution_details.inspect}") Legate.logger.debug("Plan execution finished. Original last result: #{last_successful_or_pending_result.inspect}") # --- Return BOTH sanitized details AND original last result --- # { details: plan_execution_details, last_result: last_successful_or_pending_result } end |
#execute_step(step, session, session_service, invocation_id = nil) ⇒ Hash
Executes a single step, logging :tool_request and :tool_result events via the session service.
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 |
# File 'lib/legate/plan_executor.rb', line 193 def execute_step(step, session, session_service, invocation_id = nil) session_id = session.id # --- Basic validation --- unless step.is_a?(Hash) && step[:tool] && step[:params].is_a?(Hash) error_msg = 'Invalid step format. Expected { tool: :symbol, params: {...} }' Legate.logger.error(error_msg) return { status: :error, error_message: error_msg } end raw_tool_name = step[:tool].to_s # Validate tool name against known tools before converting to Symbol # to prevent Symbol table exhaustion from untrusted input known_names = @agent.tool_registry.available_tool_names.map(&:to_s) is_delegation = raw_tool_name.start_with?('agent_transfer_to_') unless known_names.include?(raw_tool_name) || is_delegation error_msg = "Unknown tool '#{raw_tool_name}' — not in agent's tool registry" Legate.logger.error(error_msg) return { status: :error, error_message: error_msg } end tool_name = raw_tool_name.to_sym params = step[:params].to_h # --- Intercept Delegation Tools (MAS) --- # If the model outputs "agent_transfer_to_xyz", map it to "delegate_task" if tool_name.to_s.start_with?('agent_transfer_to_') target_agent_name = tool_name.to_s.sub('agent_transfer_to_', '') Legate.logger.info("Intercepted delegation tool '#{tool_name}'. Mapping to 'delegate_task' for target '#{target_agent_name}'.") # Remap tool name tool_name = :delegate_task # Remap params: ensure target_agent_name is set params[:target_agent_name] = target_agent_name # Ensure 'task' param exists (model should provide it, but handle aliasing/defaults if needed) # The prompt says: - task (string, required) unless params.key?(:task) # Fallback: if model used a different key like 'message' or 'input', map it to 'task' if params.key?(:message) params[:task] = params.delete(:message) elsif params.key?(:input) params[:task] = params.delete(:input) end end end # --- End Delegation Interception --- # --- Get the tool from our registry --- tool = @agent.tool_registry.create_instance(tool_name) unless tool error_msg = "Tool '#{tool_name}' not found in available tools." Legate.logger.error(error_msg) return { status: :error, error_message: error_msg } end # --- Prepare tool context with invocation_id and auth config --- tool_context = Legate::ToolContext.new( session_id: session.id, user_id: session.user_id, app_name: session.app_name, session_service: session_service, tool_registry: @agent.tool_registry, invocation_id: invocation_id, agent_auth_config: @agent.send(:build_agent_auth_config) ) # --- Log the tool request event --- tool_request_event = Legate::Event.new( role: :tool_request, tool_name: tool_name, content: params ) session_service.append_event(session_id: session_id, event: tool_request_event) # --- Execute before_tool_callback if defined --- if @agent.before_tool_callback.is_a?(Proc) Legate.logger.debug { "Agent '#{@agent.name}': Executing before_tool_callback for tool '#{tool_name}'." } begin # Execute the callback and check if it returns a result override_result = @agent.before_tool_callback.call(tool, params.dup, tool_context) # If the callback returns a result (not nil), use it instead of normal tool execution if override_result Legate.logger.info { "Agent '#{@agent.name}': before_tool_callback provided an override result for tool '#{tool_name}'." } # Create a tool result event with the override result and any state changes tool_result_event = Legate::Event.new( role: :tool_result, tool_name: tool_name, content: override_result, state_delta: tool_context.pending_state_delta ) session_service.append_event(session_id: session_id, event: tool_result_event) return override_result end rescue StandardError => e Legate.logger.error { "Agent '#{@agent.name}': Error in before_tool_callback for tool '#{tool_name}': #{e.}\n#{e.backtrace.join("\n")}" } error_result = { status: :error, error_message: "Error in before_tool_callback: #{e.}", error_class: e.class.name } # Create a tool result event with the error tool_result_event = Legate::Event.new( role: :tool_result, tool_name: tool_name, content: error_result, state_delta: tool_context.pending_state_delta ) session_service.append_event(session_id: session_id, event: tool_result_event) return error_result end end # --- Execute the tool --- begin Legate.logger.debug { "Executing tool '#{tool_name}' with params #{params.inspect}" } final_tool_name_to_execute = tool_name # For delegate_task tool, capture the delegate to show in logs final_tool_name_to_execute = "#{tool_name} -> #{params[:target_agent_name]}" if tool_name == :delegate_task && params[:target_agent_name] result = tool.execute(params, tool_context) # --- Execute after_tool_callback if defined --- if @agent.after_tool_callback.is_a?(Proc) Legate.logger.debug { "Agent '#{@agent.name}': Executing after_tool_callback for tool '#{final_tool_name_to_execute}'." } begin # Execute the callback and let it modify the result if needed modified_result = @agent.after_tool_callback.call(tool, params.dup, tool_context, result.dup) # If the callback returned a modified result, use it if modified_result && modified_result != result Legate.logger.info { "Agent '#{@agent.name}': after_tool_callback modified the result for tool '#{final_tool_name_to_execute}'." } result = modified_result end rescue StandardError => e Legate.logger.error { "Agent '#{@agent.name}': Error in after_tool_callback for tool '#{final_tool_name_to_execute}': #{e.}\n#{e.backtrace.join("\n")}" } # Don't override the result completely on error, just log it end end # --- Log the tool result event --- tool_result_event = Legate::Event.new( role: :tool_result, tool_name: tool_name, content: result, state_delta: tool_context.pending_state_delta ) session_service.append_event(session_id: session_id, event: tool_result_event) result rescue StandardError => e Legate.logger.error { "Error executing tool '#{tool_name}': #{e.}\n#{e.backtrace.join("\n")}" } error_result = { status: :error, error_message: "Tool '#{tool_name}' execution error: #{e.}", error_class: e.class.name # consistent with every other error hash + the plan-detail sanitizer } # Create a tool result event with the error tool_result_event = Legate::Event.new( role: :tool_result, tool_name: tool_name, content: error_result ) session_service.append_event(session_id: session_id, event: tool_result_event) error_result end end |