Class: Collavre::AiAgentJob

Inherits:
ApplicationJob
  • Object
show all
Defined in:
app/jobs/collavre/ai_agent_job.rb

Instance Method Summary collapse

Instance Method Details

#perform(agent_id_or_task, event_name = nil, context = nil) ⇒ Object

Allow resuming a task that was pending approval



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'app/jobs/collavre/ai_agent_job.rb', line 6

def perform(agent_id_or_task, event_name = nil, context = nil)
  if agent_id_or_task.is_a?(Task)
    # Resume existing task
    task = agent_id_or_task
    return if task.reload.status == "cancelled"

    agent = task.agent

    # Guard: same offline-session check as the agent_id branch below.
    # Queued Claude Channel tasks resumed via Orchestration::AgentOrchestrator
    # .dequeue_next_for_topic enter this branch as AiAgentJob.perform_later(task).
    # If AgentChannel#unsubscribed cleared routing_expression while the
    # task was queued (WS drop without DELETE /agent/:id, e.g. SIGKILL or
    # network blip past the reconnect grace), and another completion later
    # drains the queue, this task would otherwise be promoted to running →
    # delegated and broadcast to a clientless agent:user:<id> stream —
    # held until stuck recovery.
    if agent.claude_channel_agent? && agent.routing_expression.blank?
      Rails.logger.info(
        "[AiAgentJob] Skipping resumed Claude Channel task #{task.id}: " \
        "session offline (routing_expression blank)"
      )
      # Workflow subtasks created by WorkflowExecutor carry parent_task_id and
      # no topic. If we only cancel the child and return, the parent workflow
      # stays "running" with its current/pending creative state forever — no
      # rescue path runs because we never raise. Mirror the StandardError
      # rescue below: fail the child and notify the parent so the workflow
      # transitions to "failed" with a failure_reason.
      if task.parent_task_id.present?
        task.update!(status: "failed")
        Collavre::Comments::WorkflowExecutor.new(task.parent_task).fail_subtask!(
          task,
          error_message: "Claude Channel session offline before dispatch"
        )
      else
        task.update!(status: "cancelled")
      end
      if task.trigger_event_payload&.key?("topic")
        Orchestration::AgentOrchestrator.dequeue_next_for_topic(task.topic_id, task.creative_id)
      end
      return
    end

    task.update!(status: "running")
  else
    # Create new task
    agent = User.find(agent_id_or_task)

    # Guard: skip if the Claude Channel session has unregistered (or its WS
    # dropped) during the window between Scheduler enqueue and this job
    # firing. AgentsController#destroy / AgentChannel#unsubscribed clear
    # routing_expression on the per-session ai_user, so a blank value here
    # means there is no live MCP client to receive the dispatch. Without
    # this guard, a :delayed (busy / rate-limited) enqueue from
    # Scheduler#evaluate would materialize a fresh Task, flip it to
    # "delegated", and broadcast to a clientless agent:user:<id> stream
    # — holding the topic/agent slot until stuck recovery.
    if agent.claude_channel_agent? && agent.routing_expression.blank?
      Rails.logger.info(
        "[AiAgentJob] Skipping Claude Channel job for agent #{agent.id}: " \
        "session offline (routing_expression blank, event=#{event_name})"
      )
      return
    end

    # Guard: skip if there's already a running task for the same agent + comment
    comment_id = context&.dig("comment", "id")
    if comment_id && Task.duplicate_running_for_comment?(agent.id, comment_id)
      Rails.logger.warn(
        "[AiAgentJob] Skipping duplicate: agent #{agent.id} already has a running task " \
        "for comment #{comment_id} (event=#{event_name})"
      )
      return
    end

    task = Task.create!(
      name: "Response to #{event_name}",
      status: "running",
      trigger_event_name: event_name,
      trigger_event_payload: context,
      agent: agent,
      topic_id: context&.dig("topic", "id"),
      creative_id: context&.dig("creative", "id")
    )

    # Record task for loop breaker tracking (per-topic, skip user-initiated)
    creative_id = context&.dig("creative", "id")
    if creative_id
      from_ai = context&.dig("comment", "from_ai") == true
      topic_id = context&.dig("topic", "id")
      Orchestration::LoopBreaker.new(context).record_task(
        creative_id, agent.id, topic_id: topic_id, triggered_by_user: !from_ai
      )
    end
  end

  # Reserve resources before starting work
  tracker = Orchestration::ResourceTracker.for(agent)
  # Claude Channel tasks live past this job (MCP reply happens later), so
  # reserve under the stable task.id — that's the key reply / cancel /
  # stuck-recovery will use to release the slot.
  is_claude_channel_agent = agent.claude_channel_agent?
  resource_id = is_claude_channel_agent ? task.id : (job_id || task.id)
  tracker.reserve!(resource_id)
  should_release = true

  begin
    # For Claude Channel agents, transition to "delegated" BEFORE dispatching.
    # The MCP client can receive the broadcast and POST /reply on a different
    # thread before AiAgentService#call returns; the reply handler only looks
    # for status: "delegated" tasks, so a late update! would leave the
    # already-answered task stuck in delegated until stuck recovery.
    if is_claude_channel_agent
      # Atomic running -> delegated transition. If AgentsController#destroy
      # races us between reserve! above and this line and flips the task
      # to "cancelled", the WHERE filter excludes us, rows_updated == 0,
      # we skip dispatch, and the ensure block releases the slot. A
      # separate reload + update! would let the cancel slip in between.
      rows_updated = Task.where(id: task.id, status: "running").update_all(
        status: "delegated", updated_at: Time.current
      )
      if rows_updated.zero?
        task.reload
        Rails.logger.info(
          "[AiAgentJob] Claude Channel task #{task.id} not in running state " \
          "(status=#{task.status}); skipping dispatch"
        )
        return
      end
      task.reload
    end

    response_content = AiAgentService.new(task).call

    # Claude Channel agents delegate via MCP; no immediate response expected
    if is_claude_channel_agent
      # Hold agent capacity until reply / cancel / stuck-recovery releases it.
      should_release = false
    # Workflow subtasks with empty responses should retry, then fail
    elsif task.parent_task_id.present? && response_content.blank?
      max_retries = 2
      current_retry = task.retry_count || 0

      if current_retry < max_retries
        task.update!(retry_count: current_retry + 1, status: "pending")
        Rails.logger.warn(
          "[AiAgentJob] Workflow subtask #{task.id} returned empty response, " \
          "retrying (#{current_retry + 1}/#{max_retries})"
        )
        AiAgentJob.set(wait: 5.seconds).perform_later(task)
      else
        task.update!(status: "failed")
        Collavre::Comments::WorkflowExecutor.new(task.parent_task).fail_subtask!(
          task, error_message: "Agent returned empty response after #{max_retries} retries"
        )
      end
    else
      task.update!(status: "done")
      # Advance workflow (release happens in ensure block)
      if task.parent_task_id.present?
        Collavre::Comments::WorkflowExecutor.new(task.parent_task).complete_subtask!(task)
      end
    end
  rescue ApprovalPendingError
    # Task status already set to pending_approval by AiAgentService
    # Don't release resources yet - task will resume
    should_release = false
    Rails.logger.info("AiAgentJob paused for task #{task.id}: awaiting tool approval")
  rescue CancelledError
    # Task status already set to "cancelled" by Comment callback
    Rails.logger.info("AiAgentJob cancelled for task #{task.id}: trigger message deleted")
  rescue StandardError => e
    task.update!(status: "failed")
    # Fail workflow if this is a sub-task
    if task.parent_task_id.present?
      Collavre::Comments::WorkflowExecutor.new(task.parent_task).fail_subtask!(task, error_message: e.message)
    end
    Rails.logger.error("AiAgentJob failed for task #{task.id}: #{e.message}")
    raise e
  ensure
    # Guarantee resource release for all paths except pending_approval
    tracker.release!(resource_id, tokens_used: 0) if should_release && tracker && resource_id
    if task&.trigger_event_payload&.key?("topic") && %w[done failed cancelled escalated].include?(task.reload.status)
      Orchestration::AgentOrchestrator.dequeue_next_for_topic(task.topic_id, task.creative_id)
    end
  end
end