Class: Collavre::AiAgentJob
- Inherits:
-
ApplicationJob
- Object
- ApplicationJob
- Collavre::AiAgentJob
- Defined in:
- app/jobs/collavre/ai_agent_job.rb
Instance Method Summary collapse
-
#perform(agent_id_or_task, event_name = nil, context = nil) ⇒ Object
Allow resuming a task that was pending approval.
Instance Method Details
#perform(agent_id_or_task, event_name = nil, context = nil) ⇒ Object
Allow resuming a task that was pending approval
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'app/jobs/collavre/ai_agent_job.rb', line 6 def perform(agent_id_or_task, event_name = nil, context = nil) if agent_id_or_task.is_a?(Task) # Resume existing task task = agent_id_or_task return if task.reload.status == "cancelled" task.update!(status: "running") agent = task.agent else # Create new task agent = User.find(agent_id_or_task) # Guard: skip if there's already a running task for the same agent + comment comment_id = context&.dig("comment", "id") if comment_id && Task.duplicate_running_for_comment?(agent.id, comment_id) Rails.logger.warn( "[AiAgentJob] Skipping duplicate: agent #{agent.id} already has a running task " \ "for comment #{comment_id} (event=#{event_name})" ) return end task = Task.create!( name: "Response to #{event_name}", status: "running", trigger_event_name: event_name, trigger_event_payload: context, agent: agent, topic_id: context&.dig("topic", "id"), creative_id: context&.dig("creative", "id") ) # Record task for loop breaker tracking (per-topic, skip user-initiated) creative_id = context&.dig("creative", "id") if creative_id from_ai = context&.dig("comment", "from_ai") == true topic_id = context&.dig("topic", "id") Orchestration::LoopBreaker.new(context).record_task( creative_id, agent.id, topic_id: topic_id, triggered_by_user: !from_ai ) end end # Reserve resources before starting work tracker = Orchestration::ResourceTracker.for(agent) resource_id = job_id || task.id tracker.reserve!(resource_id) should_release = true begin response_content = AiAgentService.new(task).call # Workflow subtasks with empty responses should retry, then fail if task.parent_task_id.present? && response_content.blank? max_retries = 2 current_retry = task.retry_count || 0 if current_retry < max_retries task.update!(retry_count: current_retry + 1, status: "pending") Rails.logger.warn( "[AiAgentJob] Workflow subtask #{task.id} returned empty response, " \ "retrying (#{current_retry + 1}/#{max_retries})" ) AiAgentJob.set(wait: 5.seconds).perform_later(task) else task.update!(status: "failed") Collavre::Comments::WorkflowExecutor.new(task.parent_task).fail_subtask!( task, error_message: "Agent returned empty response after #{max_retries} retries" ) end else task.update!(status: "done") # Advance workflow (release happens in ensure block) if task.parent_task_id.present? Collavre::Comments::WorkflowExecutor.new(task.parent_task).complete_subtask!(task) end end rescue ApprovalPendingError # Task status already set to pending_approval by AiAgentService # Don't release resources yet - task will resume should_release = false Rails.logger.info("AiAgentJob paused for task #{task.id}: awaiting tool approval") rescue CancelledError # Task status already set to "cancelled" by Comment callback Rails.logger.info("AiAgentJob cancelled for task #{task.id}: trigger message deleted") rescue StandardError => e task.update!(status: "failed") # Fail workflow if this is a sub-task if task.parent_task_id.present? Collavre::Comments::WorkflowExecutor.new(task.parent_task).fail_subtask!(task, error_message: e.) end Rails.logger.error("AiAgentJob failed for task #{task.id}: #{e.}") raise e ensure # Guarantee resource release for all paths except pending_approval tracker.release!(resource_id, tokens_used: 0) if should_release && tracker && resource_id if task&.trigger_event_payload&.key?("topic") && %w[done failed cancelled escalated].include?(task.reload.status) Orchestration::AgentOrchestrator.dequeue_next_for_topic(task.topic_id, task.creative_id) end end end |