Class: Collavre::AgentChannel
- Inherits:
-
ApplicationCable::Channel
- Object
- ApplicationCable::Channel
- Collavre::AgentChannel
- Defined in:
- app/channels/collavre/agent_channel.rb
Class Method Summary collapse
-
.broadcast_to_agent(agent_id, payload) ⇒ Object
Broadcast to a per-agent stream so the agent’s MCP plugin receives the dispatch regardless of which topic triggered it.
-
.broadcast_to_topic(topic_id, payload) ⇒ Object
Broadcast an arbitrary payload to a topic’s agent stream.
Instance Method Summary collapse
-
#replay_permissions(data) ⇒ Object
Pull-on-resubscribe replay (Codex P2): after every (re)subscribe the plugin sends the permission request_ids it still holds pending, and the server re-broadcasts the recorded decision for exactly those ids.
-
#subscribed ⇒ Object
Subscribes to an agent stream for real-time dispatch notifications.
-
#unsubscribed ⇒ Object
When the MCP process crashes or the WebSocket drops before the plugin can call DELETE /api/v1/agent/:id, a Claude Channel session would otherwise stay matchable (routing_expression: “true”) and any future comment on a creative still shared with the agent would dispatch into an empty stream — delegated work that only clears via stuck recovery.
Class Method Details
.broadcast_to_agent(agent_id, payload) ⇒ Object
Broadcast to a per-agent stream so the agent’s MCP plugin receives the dispatch regardless of which topic triggered it.
135 136 137 |
# File 'app/channels/collavre/agent_channel.rb', line 135 def self.broadcast_to_agent(agent_id, payload) ActionCable.server.broadcast("agent:user:#{agent_id}", payload) end |
.broadcast_to_topic(topic_id, payload) ⇒ Object
Broadcast an arbitrary payload to a topic’s agent stream.
129 130 131 |
# File 'app/channels/collavre/agent_channel.rb', line 129 def self.broadcast_to_topic(topic_id, payload) ActionCable.server.broadcast("agent:topic:#{topic_id}", payload) end |
Instance Method Details
#replay_permissions(data) ⇒ Object
Pull-on-resubscribe replay (Codex P2): after every (re)subscribe the plugin sends the permission request_ids it still holds pending, and the server re-broadcasts the recorded decision for exactly those ids. A decision broadcast once into the transient agent:user:<id> stream while the plugin’s WebSocket was down would otherwise be lost (action_executed_at already stamped, buttons hidden), hanging the suspended tool with no retry path. The plugin’s pending set is the sole bound — there is no wall-clock window, so an outage of any length is covered, and a decision already consumed is never requested. Idempotent: the plugin’s coordinator drops any id it no longer holds. Gated on @session_agent so only a Claude Channel agent subscription (where stream_from agent:user:<id> is attached) can pull.
119 120 121 122 123 124 125 126 |
# File 'app/channels/collavre/agent_channel.rb', line 119 def (data) return unless @session_agent Comment.( @session_agent.id, data["request_ids"] ) end |
#subscribed ⇒ Object
Subscribes to an agent stream for real-time dispatch notifications. Accepts either:
- agent_id: per-agent stream — used by MCP plugin clients (Claude
Channel) so they receive every dispatch routed to the agent, no
matter which topic triggered it. Authorized by created_by ownership.
- topic_id: per-topic stream — legacy/UI listeners scoped to one topic.
17 18 19 20 21 22 23 24 25 26 27 |
# File 'app/channels/collavre/agent_channel.rb', line 17 def subscribed return reject unless current_user if params[:agent_id].present? subscribe_to_agent_stream elsif params[:topic_id].present? subscribe_to_topic_stream else reject end end |
#unsubscribed ⇒ Object
When the MCP process crashes or the WebSocket drops before the plugin can call DELETE /api/v1/agent/:id, a Claude Channel session would otherwise stay matchable (routing_expression: “true”) and any future comment on a creative still shared with the agent would dispatch into an empty stream — delegated work that only clears via stuck recovery.
One shared agent can have MANY concurrent sessions, so routing is gated on PRESENCE: this session drops its own AgentSubscription row, and routing is cleared only when NO rows remain. A still-live sibling session keeps its row, so routing stays active and its in-flight work is never cancelled. Done under the agent’s row lock so a concurrent subscribe on the same agent serializes (no clear-vs-activate race).
Cross-process / late unsubscribe: the row was keyed by this connection’s own @subscription_token. A stale unsubscribe whose row was already removed (newer subscribe took over, or a different Puma/Kamal process) deletes zero rows and becomes a no-op — required for scaled deployments (WEB_CONCURRENCY > 1, Solid Cable).
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'app/channels/collavre/agent_channel.rb', line 47 def unsubscribed return unless @session_agent && @subscription_token cleared = false dropped_session_id = nil @session_agent.with_lock do scope = AgentSubscription.where(agent_id: @session_agent.id, token: @subscription_token) # Capture the dropped session's id before deleting its row so a live- # sibling exit can still scope a cleanup to this session's own topic. dropped_session_id = scope.pick(:session_id) deleted = scope.delete_all # Stale: our presence row is already gone. The live owner's lifecycle # owns routing — do not clobber it, do not schedule cancellation. if deleted.zero? dropped_session_id = nil next end # Drop crash-orphaned sibling rows (a Puma/ActionCable process that # died without firing unsubscribed) before reading presence, so a dead # row cannot masquerade as a live sibling and pin routing on forever. AgentSubscription.reap_stale!(@session_agent.id) # Another session is still LIVE on this shared agent. Keep routing # active; whichever session unsubscribes last clears it. next if AgentSubscription.live.where(agent_id: @session_agent.id).exists? @session_agent.update_columns( routing_expression: nil, routing_subscription_token: nil ) cleared = true end if cleared # Reconnect-grace cancellation (last session): clearing routing only # makes the agent unroutable. Any task already "delegated" still holds # its ResourceTracker slot — the dispatch was broadcast to a now-dead # stream so no client remains to call /reply, and the slot would stay # held until StuckDetectorJob times out. The job cancels those tasks # after a grace window, but only if the agent is still offline (it # rechecks routing_expression AND that no session has resubscribed). CancelOfflineDelegatedTasksJob .set(wait: CancelOfflineDelegatedTasksJob::GRACE_SECONDS.seconds) .perform_later(@session_agent.id, @subscription_token) elsif dropped_session_id.present? # A live sibling keeps the shared agent routable, so we must NOT clear # routing or sweep agent-wide. But this dropped session's OWN session # topic is private to it — siblings filter session_topic dispatches to # their own topic, so none will /reply to a task delegated there and it # would hold its slot until stuck recovery. Schedule a grace-delayed, # session-scoped cancellation (skipped if this same session reconnects # within the window), mirroring the destroy path's cancel_tasks_for_topic. CancelOfflineDelegatedTasksJob .set(wait: CancelOfflineDelegatedTasksJob::GRACE_SECONDS.seconds) .perform_later(@session_agent.id, @subscription_token, dropped_session_id) end rescue ActiveRecord::StatementInvalid => e Rails.logger.warn("[AgentChannel] unsubscribed presence clear failed: #{e.}") end |