Class: Collavre::AiAgentJob
- Inherits:
-
ApplicationJob
- Object
- ApplicationJob
- Collavre::AiAgentJob
- Defined in:
- app/jobs/collavre/ai_agent_job.rb
Instance Method Summary collapse
-
#perform(agent_id_or_task, event_name = nil, context = nil) ⇒ Object
Allow resuming a task that was pending approval.
Instance Method Details
#perform(agent_id_or_task, event_name = nil, context = nil) ⇒ Object
Allow resuming a task that was pending approval
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'app/jobs/collavre/ai_agent_job.rb', line 6 def perform(agent_id_or_task, event_name = nil, context = nil) if agent_id_or_task.is_a?(Task) # Resume existing task task = agent_id_or_task return if task.reload.status == "cancelled" agent = task.agent # Guard: same offline-session check as the agent_id branch below. # Queued Claude Channel tasks resumed via Orchestration::AgentOrchestrator # .dequeue_next_for_topic enter this branch as AiAgentJob.perform_later(task). # If AgentChannel#unsubscribed cleared routing_expression while the # task was queued (WS drop without DELETE /agent/:id, e.g. SIGKILL or # network blip past the reconnect grace), and another completion later # drains the queue, this task would otherwise be promoted to running → # delegated and broadcast to a clientless agent:user:<id> stream — # held until stuck recovery. if agent.claude_channel_agent? && agent.routing_expression.blank? Rails.logger.info( "[AiAgentJob] Skipping resumed Claude Channel task #{task.id}: " \ "session offline (routing_expression blank)" ) # Workflow subtasks created by WorkflowExecutor carry parent_task_id and # no topic. If we only cancel the child and return, the parent workflow # stays "running" with its current/pending creative state forever — no # rescue path runs because we never raise. Mirror the StandardError # rescue below: fail the child and notify the parent so the workflow # transitions to "failed" with a failure_reason. if task.parent_task_id.present? task.update!(status: "failed") Collavre::Comments::WorkflowExecutor.new(task.parent_task).fail_subtask!( task, error_message: "Claude Channel session offline before dispatch" ) else task.update!(status: "cancelled") end if task.trigger_event_payload&.key?("topic") Orchestration::AgentOrchestrator.dequeue_next_for_topic(task.topic_id, task.creative_id) end return end task.update!(status: "running") else # Create new task agent = User.find(agent_id_or_task) # Guard: skip if the Claude Channel session has unregistered (or its WS # dropped) during the window between Scheduler enqueue and this job # firing. AgentsController#destroy / AgentChannel#unsubscribed clear # routing_expression on the per-session ai_user, so a blank value here # means there is no live MCP client to receive the dispatch. Without # this guard, a :delayed (busy / rate-limited) enqueue from # Scheduler#evaluate would materialize a fresh Task, flip it to # "delegated", and broadcast to a clientless agent:user:<id> stream # — holding the topic/agent slot until stuck recovery. if agent.claude_channel_agent? && agent.routing_expression.blank? Rails.logger.info( "[AiAgentJob] Skipping Claude Channel job for agent #{agent.id}: " \ "session offline (routing_expression blank, event=#{event_name})" ) return end # Guard: skip if there's already a running task for the same agent + comment comment_id = context&.dig("comment", "id") if comment_id && Task.duplicate_running_for_comment?(agent.id, comment_id) Rails.logger.warn( "[AiAgentJob] Skipping duplicate: agent #{agent.id} already has a running task " \ "for comment #{comment_id} (event=#{event_name})" ) return end task = Task.create!( name: "Response to #{event_name}", status: "running", trigger_event_name: event_name, trigger_event_payload: context, agent: agent, topic_id: context&.dig("topic", "id"), creative_id: context&.dig("creative", "id") ) # Record task for loop breaker tracking (per-topic, skip user-initiated) creative_id = context&.dig("creative", "id") if creative_id from_ai = context&.dig("comment", "from_ai") == true topic_id = context&.dig("topic", "id") Orchestration::LoopBreaker.new(context).record_task( creative_id, agent.id, topic_id: topic_id, triggered_by_user: !from_ai ) end end # Reserve resources before starting work tracker = Orchestration::ResourceTracker.for(agent) # Claude Channel tasks live past this job (MCP reply happens later), so # reserve under the stable task.id — that's the key reply / cancel / # stuck-recovery will use to release the slot. is_claude_channel_agent = agent.claude_channel_agent? resource_id = is_claude_channel_agent ? task.id : (job_id || task.id) tracker.reserve!(resource_id) should_release = true begin # For Claude Channel agents, transition to "delegated" BEFORE dispatching. # The MCP client can receive the broadcast and POST /reply on a different # thread before AiAgentService#call returns; the reply handler only looks # for status: "delegated" tasks, so a late update! would leave the # already-answered task stuck in delegated until stuck recovery. if is_claude_channel_agent # Atomic running -> delegated transition. If AgentsController#destroy # races us between reserve! above and this line and flips the task # to "cancelled", the WHERE filter excludes us, rows_updated == 0, # we skip dispatch, and the ensure block releases the slot. A # separate reload + update! would let the cancel slip in between. rows_updated = Task.where(id: task.id, status: "running").update_all( status: "delegated", updated_at: Time.current ) if rows_updated.zero? task.reload Rails.logger.info( "[AiAgentJob] Claude Channel task #{task.id} not in running state " \ "(status=#{task.status}); skipping dispatch" ) return end task.reload end response_content = AiAgentService.new(task).call # Claude Channel agents delegate via MCP; no immediate response expected if is_claude_channel_agent # Hold agent capacity until reply / cancel / stuck-recovery releases it. should_release = false # Workflow subtasks with empty responses should retry, then fail elsif task.parent_task_id.present? && response_content.blank? max_retries = 2 current_retry = task.retry_count || 0 if current_retry < max_retries task.update!(retry_count: current_retry + 1, status: "pending") Rails.logger.warn( "[AiAgentJob] Workflow subtask #{task.id} returned empty response, " \ "retrying (#{current_retry + 1}/#{max_retries})" ) AiAgentJob.set(wait: 5.seconds).perform_later(task) else task.update!(status: "failed") Collavre::Comments::WorkflowExecutor.new(task.parent_task).fail_subtask!( task, error_message: "Agent returned empty response after #{max_retries} retries" ) end else task.update!(status: "done") # Advance workflow (release happens in ensure block) if task.parent_task_id.present? Collavre::Comments::WorkflowExecutor.new(task.parent_task).complete_subtask!(task) end end rescue ApprovalPendingError # Task status already set to pending_approval by AiAgentService # Don't release resources yet - task will resume should_release = false Rails.logger.info("AiAgentJob paused for task #{task.id}: awaiting tool approval") rescue CancelledError # Task status already set to "cancelled" by Comment callback Rails.logger.info("AiAgentJob cancelled for task #{task.id}: trigger message deleted") rescue StandardError => e task.update!(status: "failed") # Fail workflow if this is a sub-task if task.parent_task_id.present? Collavre::Comments::WorkflowExecutor.new(task.parent_task).fail_subtask!(task, error_message: e.) end Rails.logger.error("AiAgentJob failed for task #{task.id}: #{e.}") raise e ensure # Guarantee resource release for all paths except pending_approval tracker.release!(resource_id, tokens_used: 0) if should_release && tracker && resource_id if task&.trigger_event_payload&.key?("topic") && %w[done failed cancelled escalated].include?(task.reload.status) Orchestration::AgentOrchestrator.dequeue_next_for_topic(task.topic_id, task.creative_id) end end end |