Class: Collavre::AgentChannel

Inherits:
ApplicationCable::Channel
  • Object
show all
Defined in:
app/channels/collavre/agent_channel.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.broadcast_to_agent(agent_id, payload) ⇒ Object

Broadcast to a per-agent stream so the agent’s MCP plugin receives the dispatch regardless of which topic triggered it.



135
136
137
# File 'app/channels/collavre/agent_channel.rb', line 135

def self.broadcast_to_agent(agent_id, payload)
  ActionCable.server.broadcast("agent:user:#{agent_id}", payload)
end

.broadcast_to_topic(topic_id, payload) ⇒ Object

Broadcast an arbitrary payload to a topic’s agent stream.



129
130
131
# File 'app/channels/collavre/agent_channel.rb', line 129

def self.broadcast_to_topic(topic_id, payload)
  ActionCable.server.broadcast("agent:topic:#{topic_id}", payload)
end

Instance Method Details

#replay_permissions(data) ⇒ Object

Pull-on-resubscribe replay (Codex P2): after every (re)subscribe the plugin sends the permission request_ids it still holds pending, and the server re-broadcasts the recorded decision for exactly those ids. A decision broadcast once into the transient agent:user:<id> stream while the plugin’s WebSocket was down would otherwise be lost (action_executed_at already stamped, buttons hidden), hanging the suspended tool with no retry path. The plugin’s pending set is the sole bound — there is no wall-clock window, so an outage of any length is covered, and a decision already consumed is never requested. Idempotent: the plugin’s coordinator drops any id it no longer holds. Gated on @session_agent so only a Claude Channel agent subscription (where stream_from agent:user:<id> is attached) can pull.



119
120
121
122
123
124
125
126
# File 'app/channels/collavre/agent_channel.rb', line 119

def replay_permissions(data)
  return unless @session_agent

  Comment.replay_claude_channel_permission_decisions_for(
    @session_agent.id,
    data["request_ids"]
  )
end

#subscribedObject

Subscribes to an agent stream for real-time dispatch notifications. Accepts either:

- agent_id: per-agent stream — used by MCP plugin clients (Claude
  Channel) so they receive every dispatch routed to the agent, no
  matter which topic triggered it. Authorized by created_by ownership.
- topic_id: per-topic stream — legacy/UI listeners scoped to one topic.


17
18
19
20
21
22
23
24
25
26
27
# File 'app/channels/collavre/agent_channel.rb', line 17

def subscribed
  return reject unless current_user

  if params[:agent_id].present?
    subscribe_to_agent_stream
  elsif params[:topic_id].present?
    subscribe_to_topic_stream
  else
    reject
  end
end

#unsubscribedObject

When the MCP process crashes or the WebSocket drops before the plugin can call DELETE /api/v1/agent/:id, a Claude Channel session would otherwise stay matchable (routing_expression: “true”) and any future comment on a creative still shared with the agent would dispatch into an empty stream — delegated work that only clears via stuck recovery.

One shared agent can have MANY concurrent sessions, so routing is gated on PRESENCE: this session drops its own AgentSubscription row, and routing is cleared only when NO rows remain. A still-live sibling session keeps its row, so routing stays active and its in-flight work is never cancelled. Done under the agent’s row lock so a concurrent subscribe on the same agent serializes (no clear-vs-activate race).

Cross-process / late unsubscribe: the row was keyed by this connection’s own @subscription_token. A stale unsubscribe whose row was already removed (newer subscribe took over, or a different Puma/Kamal process) deletes zero rows and becomes a no-op — required for scaled deployments (WEB_CONCURRENCY > 1, Solid Cable).



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'app/channels/collavre/agent_channel.rb', line 47

def unsubscribed
  return unless @session_agent && @subscription_token

  cleared = false
  dropped_session_id = nil
  @session_agent.with_lock do
    scope = AgentSubscription.where(agent_id: @session_agent.id, token: @subscription_token)
    # Capture the dropped session's id before deleting its row so a live-
    # sibling exit can still scope a cleanup to this session's own topic.
    dropped_session_id = scope.pick(:session_id)
    deleted = scope.delete_all
    # Stale: our presence row is already gone. The live owner's lifecycle
    # owns routing — do not clobber it, do not schedule cancellation.
    if deleted.zero?
      dropped_session_id = nil
      next
    end

    # Drop crash-orphaned sibling rows (a Puma/ActionCable process that
    # died without firing unsubscribed) before reading presence, so a dead
    # row cannot masquerade as a live sibling and pin routing on forever.
    AgentSubscription.reap_stale!(@session_agent.id)

    # Another session is still LIVE on this shared agent. Keep routing
    # active; whichever session unsubscribes last clears it.
    next if AgentSubscription.live.where(agent_id: @session_agent.id).exists?

    @session_agent.update_columns(
      routing_expression: nil,
      routing_subscription_token: nil
    )
    cleared = true
  end

  if cleared
    # Reconnect-grace cancellation (last session): clearing routing only
    # makes the agent unroutable. Any task already "delegated" still holds
    # its ResourceTracker slot — the dispatch was broadcast to a now-dead
    # stream so no client remains to call /reply, and the slot would stay
    # held until StuckDetectorJob times out. The job cancels those tasks
    # after a grace window, but only if the agent is still offline (it
    # rechecks routing_expression AND that no session has resubscribed).
    CancelOfflineDelegatedTasksJob
      .set(wait: CancelOfflineDelegatedTasksJob::GRACE_SECONDS.seconds)
      .perform_later(@session_agent.id, @subscription_token)
  elsif dropped_session_id.present?
    # A live sibling keeps the shared agent routable, so we must NOT clear
    # routing or sweep agent-wide. But this dropped session's OWN session
    # topic is private to it — siblings filter session_topic dispatches to
    # their own topic, so none will /reply to a task delegated there and it
    # would hold its slot until stuck recovery. Schedule a grace-delayed,
    # session-scoped cancellation (skipped if this same session reconnects
    # within the window), mirroring the destroy path's cancel_tasks_for_topic.
    CancelOfflineDelegatedTasksJob
      .set(wait: CancelOfflineDelegatedTasksJob::GRACE_SECONDS.seconds)
      .perform_later(@session_agent.id, @subscription_token, dropped_session_id)
  end
rescue ActiveRecord::StatementInvalid => e
  Rails.logger.warn("[AgentChannel] unsubscribed presence clear failed: #{e.message}")
end