Class: Collavre::CancelOfflineDelegatedTasksJob

Inherits:
ApplicationJob
  • Object
show all
Defined in:
app/jobs/collavre/cancel_offline_delegated_tasks_job.rb

Overview

Reconnect-grace cancellation for delegated tasks owned by a Claude Channel session whose WebSocket dropped without DELETE /api/v1/agent/:id.

AgentChannel#unsubscribed makes the agent unroutable, but a task already in “delegated” still holds its ResourceTracker slot and (for workflow subtasks) blocks the parent workflow. The dispatch was broadcast to a now-dead stream, so no client remains to call /reply — without this job the slot stays held until StuckDetectorJob times out (minutes to hours).

The grace delay lets transient WS blips self-heal: if the client reconnects before the job fires, AgentChannel#subscribe_to_agent_stream restores routing_expression to “true” and writes a fresh subscription token. The job’s online-state recheck then no-ops.

Constant Summary collapse

GRACE_SECONDS =
30

Instance Method Summary collapse

Instance Method Details

#perform(agent_id, expected_token, session_id = nil) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'app/jobs/collavre/cancel_offline_delegated_tasks_job.rb', line 22

def perform(agent_id, expected_token, session_id = nil)
  agent = User.find_by(id: agent_id)
  return unless agent&.claude_channel_agent?

  # Session-scoped variant: a live sibling may keep the shared agent
  # routable (so the agent-wide rechecks below would no-op), but the
  # dropped session's OWN session topic is private to it — siblings filter
  # session_topic dispatches to their own topic, so none will /reply. Cancel
  # only that topic's delegated work. Enqueued by AgentChannel#unsubscribed
  # when a sibling remains; mirrors the destroy path's cancel_tasks_for_topic.
  return cancel_dropped_session_tasks(agent, session_id) if session_id.present?

  # Agent came back online during the grace window — the same MCP
  # session (or a new one) is subscribed and can answer /reply.
  return if agent.routing_expression.present?

  # A session (reconnect or a still-live sibling sharing this agent) holds
  # a LIVE presence row — the agent is online, its delegated work is still
  # owned. Presence is the authority now that one agent fans out to many
  # concurrent sessions. Reap crash-orphaned rows first so a dead process's
  # leftover row can't masquerade as a live session and strand this work.
  AgentSubscription.reap_stale!(agent.id)
  return if AgentSubscription.live.where(agent_id: agent.id).exists?

  # A different subscription has taken over (token rotated). The new
  # session's lifecycle owns its own cancellation; don't double-cancel.
  if expected_token.present? &&
     agent.routing_subscription_token.present? &&
     agent.routing_subscription_token != expected_token
    return
  end

  cancel_delegated_tasks(Task.where(agent_id: agent.id, status: "delegated"), agent)
end