Class: Collavre::Api::V1::AgentsController

Inherits:
BaseController
  • Object
show all
Defined in:
app/controllers/collavre/api/v1/agents_controller.rb

Instance Method Summary collapse

Instance Method Details

#destroyObject

DELETE /api/v1/agent/:id Archives the agent’s topic (session ended). topic_id is required: register reuses the same ai_user across all sessions for the same human, so the primary_agent association alone cannot identify which session is ending. Without an explicit topic_id we could archive a sibling session’s active topic.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'app/controllers/collavre/api/v1/agents_controller.rb', line 75

def destroy
  ai_user = User.find_by(id: params[:id])
  unless ai_user&.ai_user? && ai_user.created_by_id == current_user.id
    render json: { error: "Agent not found" }, status: :not_found
    return
  end

  if params[:topic_id].blank?
    render json: { error: "topic_id is required" }, status: :unprocessable_entity
    return
  end

  inbox = Creative.inbox_for(current_user)
  topic = inbox.topics.active.find_by(id: params[:topic_id])
  # Ensure the topic actually belongs to this agent so a mismatched
  # topic_id can't archive an unrelated inbox conversation.
  topic = nil unless topic && topic.primary_agent&.id == ai_user.id

  # The plugin closes the WebSocket and IMMEDIATELY calls DELETE, so
  # AgentChannel#unsubscribed may not have dropped this session's presence
  # row yet. Drop it up front — keyed by the ending session's id (sent by
  # the plugin, or derived from the required topic) — so this session's
  # own still-live row cannot masquerade as a live sibling and skip the
  # last-session teardown (which would pin routing_expression on and, if
  # the WS close is never processed, leave nothing to clear it). Done
  # under the agent row lock to serialize against a concurrent subscribe/
  # unsubscribe on the shared agent — the same lock AgentChannel uses.
  exiting_session_id = params[:session_id].presence || topic&.session_id
  last_session = false
  ai_user.with_lock do
    if exiting_session_id.present?
      AgentSubscription
        .where(agent_id: ai_user.id, session_id: exiting_session_id)
        .delete_all
    end

    unless sibling_sessions_live?(ai_user)
      last_session = true
      # Last (or only) session for this agent. Clear routing_expression
      # FIRST so Orchestration::Matcher#match_by_expression stops
      # dispatching new comments to this now-clientless agent while we
      # drain its task graph below; otherwise a comment arriving mid-drain
      # could enqueue a new task right after we cancelled the last one.
      # (Routing is reactivated on the next AgentChannel subscribe, not on
      # re-register.)
      ai_user.update_column(:routing_expression, nil) if ai_user.routing_expression.present?
    end
  end

  if last_session
    # Cancel queued/pending tasks BEFORE draining topic queues. The topic
    # queue (Task.queued_for_topic) is scoped by topic_id, not agent_id,
    # so dequeue_next_for_topic would otherwise flip a queued task for
    # this clientless agent to pending and enqueue AiAgentJob — which then
    # broadcasts to a stream with no subscriber and strands the task in
    # delegated until stuck recovery.
    cancel_pending_tasks_for_session(ai_user)

    # Fail any tasks still delegated to this agent — including dispatches
    # routed to *work* topics outside the registration inbox. With no live
    # session left, agent_id uniquely scopes this agent's delegated work;
    # scoping by topic_id alone would leak the common case (a /work
    # dispatch on a project topic) until stuck detection times out.
    cancel_delegated_tasks_for_session(ai_user)
  elsif topic
    # One shared agent fans out to many concurrent sessions (the default
    # AGENT_NAME case). Another session is still LIVE, so agent-wide
    # cleanup would clear the shared routing_expression and cancel the
    # sibling's in-flight work — exactly the hazard AgentChannel#unsubscribed
    # guards against via presence. Tear down ONLY this ending session:
    # cancel work on its own topic (whose client is gone, so it would
    # otherwise hold a slot / block the topic queue until stuck recovery).
    # The shared routing and any work on other topics belong to the live
    # sibling and are left untouched.
    cancel_tasks_for_topic(ai_user, topic)
  end
  topic.archive! if topic

  head :no_content
end

#notifyObject

POST /api/v1/agent/notify Posts an out-of-band informational comment to a topic as the agent WITHOUT completing any task or dispatching. Unlike /reply, this never touches the task graph: it surfaces notices (e.g. native channel permission prompts relayed by Claude Code mid-turn) into the topic chat while the originating task stays in flight. skip_dispatch avoids spawning a new delegated task for the agent’s own message.



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'app/controllers/collavre/api/v1/agents_controller.rb', line 223

def notify
  topic = Topic.find_by(id: params[:topic_id])
  unless topic
    render json: { error: "Topic not found" }, status: :not_found
    return
  end

  creative = topic.creative&.effective_origin
  unless creative
    render json: { error: "Creative not found" }, status: :not_found
    return
  end

  unless creative.has_permission?(current_user, :feedback)
    render json: { error: "Not authorized" }, status: :forbidden
    return
  end

  # The poster must be this session's own Claude Channel agent, owned by
  # the token holder — so /notify can't be used to ventriloquize an
  # unrelated agent or post into a topic the caller doesn't drive.
  agent = resolve_notify_agent(topic, params[:task_id])
  unless agent
    render json: { error: "Not authorized" }, status: :forbidden
    return
  end

  comment =
    if params[:permission_request_id].present?
      build_permission_comment(creative, topic, agent)
    else
      creative.comments.build(
        content: params[:text].to_s,
        topic: topic,
        user: agent,
        skip_default_user: true,
        skip_dispatch: true
      )
    end

  if comment.save
    park_pending_permission(topic, agent, params[:task_id], params[:permission_request_id])
    render json: { comment_id: comment.id }, status: :created
  else
    render json: { errors: comment.errors.full_messages }, status: :unprocessable_entity
  end
end

#registerObject

POST /api/v1/agent/register Registers a Claude Code session, separating two identities:

- Agent (agent_name): the user-facing unit. One shared ai_user per
  (human, agent_name). Without an explicit AGENT_NAME the plugin
  sends a default name so every session collapses onto one agent.
- Session (session_id): one Topic per Claude Code session, keyed by
  a stable id (derived per cwd by the plugin, stable across
  --resume). Multiple session topics can share one primary_agent.

Back-compat: older plugins send only :name. It is treated as BOTH the agent and the session identity, reproducing the prior one-agent-one-topic-per-name behavior.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'app/controllers/collavre/api/v1/agents_controller.rb', line 19

def register
  agent_name = params[:agent_name].to_s.strip
  session_id = params[:session_id].to_s.strip
  legacy_name = params[:name].to_s.strip
  agent_name = legacy_name if agent_name.blank?
  session_id = legacy_name if session_id.blank?

  if agent_name.blank? || session_id.blank?
    render json: { error: "name (or agent_name and session_id) is required" },
           status: :unprocessable_entity
    return
  end

  ai_user = find_or_create_session_agent(agent_name)
  unless ai_user
    render json: { error: "Session agent email is already in use by a different account" }, status: :conflict
    return
  end

  inbox = Creative.inbox_for(current_user)
  topic = find_or_create_session_topic(inbox, ai_user, session_id, params[:session_label])
  topic.unarchive! if topic.archived?
  topic.set_primary_agent!(ai_user)

  # find_or_create_by! keeps the SELECT-first path (so a sequential
  # re-register reuses the existing share without tripping CreativeShare's
  # model-level user_id/creative_id uniqueness validation). But concurrent
  # sibling registrations can both pass that SELECT before either commits;
  # the loser then hits the DB unique index. Rescue that race and re-find
  # instead of surfacing a 500/conflict.
  begin
    CreativeShare.find_or_create_by!(creative: inbox, user: ai_user) do |s|
      s.permission = :feedback
      s.shared_by = current_user
    end
  rescue ActiveRecord::RecordNotUnique
    CreativeShare.find_by!(creative: inbox, user: ai_user)
  end

  render json: {
    agent_id: ai_user.id,
    agent_name: ai_user.name,
    topic_id: topic.id,
    topic_name: topic.name,
    session_id: session_id,
    inbox_creative_id: inbox.id,
    ws_url: "/cable"
  }, status: :ok
end

#replyObject

POST /api/v1/agent/reply Creates a comment in a topic as the AI agent



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'app/controllers/collavre/api/v1/agents_controller.rb', line 158

def reply
  topic = Topic.find_by(id: params[:topic_id])
  unless topic
    render json: { error: "Topic not found" }, status: :not_found
    return
  end

  creative = topic.creative&.effective_origin
  unless creative
    render json: { error: "Creative not found" }, status: :not_found
    return
  end

  unless creative.has_permission?(current_user, :feedback)
    render json: { error: "Not authorized" }, status: :forbidden
    return
  end

  agent = resolve_reply_agent(topic, params[:task_id])
  unless agent
    render json: { error: "Not authorized" }, status: :forbidden
    return
  end

  # Atomically claim the delegated task BEFORE saving the reply.
  # Two concurrent /reply requests with the same task_id can both
  # pass resolve_reply_agent (which is a read-only scope check) and,
  # without an atomic WHERE status='delegated' transition, both would
  # save separate comments and both would run completion logic —
  # producing duplicate linked replies for one dispatch. Claim first,
  # save second, so the loser sees rows_updated == 0 and bails out
  # before touching the comments table.
  claimed_task = claim_delegated_task(agent, topic, params[:task_id])
  if params[:task_id].present? && claimed_task.nil?
    render json: { error: "Task already completed or not delegated" }, status: :conflict
    return
  end

  comment = creative.comments.build(
    content: params[:text].to_s,
    topic: topic,
    user: agent,
    skip_default_user: true,
    skip_dispatch: true
  )

  if comment.save
    finalize_claimed_task(agent, claimed_task, comment) if claimed_task
    dispatch_a2a(agent, comment)
    render json: { comment_id: comment.id }, status: :created
  else
    # Restore the dispatch so the MCP client can retry — the failure
    # is text-level (validation), not task-level.
    claimed_task&.update!(status: "delegated")
    render json: { errors: comment.errors.full_messages }, status: :unprocessable_entity
  end
end