Module: Parse::Agent::MCPSubscriptions
- Defined in:
- lib/parse/agent/mcp_subscriptions.rb
Overview
Resource-subscription bridge for the MCP server.
MCP 2025-06-18 lets a client resources/subscribe to a resource URI and
then receive unsolicited notifications/resources/updated messages over a
server→client channel whenever the underlying data changes. This module
bridges that surface onto Parse LiveQuery: a subscribed
parse://<Class>/count or parse://<Class>/samples URI is backed by a
LiveQuery subscription on <Class>; any matching create/update/delete/
enter/leave event is debounced and projected to a single coarse
notifications/resources/updated for that URI. The client re-reads the
resource via resources/read to obtain the new value — the SDK never
streams row payloads through the MCP resource surface.
== Components
- Manager — per-transport (per MCPRackApp instance) coordinator. Owns the session→subscription bookkeeping, derives LiveQuery credentials from the subscribing agent, starts/stops LiveQuery subscriptions, and routes debounced updates through the Notifier.
- LocalNotifier — the in-process Notifier implementation. A listening SSE stream registers a delivery callback under its session id; the bridge publishes notifications to that session id. The Notifier contract is the clustered-ready seam: a Redis-backed implementation can drop in without touching Manager so a LiveQuery event observed on one worker process can reach a listening stream held on another.
== Security: credential derivation (fail-closed)
LiveQuery enforces ACL server-side via the session token supplied on the
subscribe frame — exactly like the REST surface, and unlike the
master-key-only REST aggregate endpoint. The bridge therefore mirrors
the SDK's documented scope asymmetry (see CLAUDE.md "Critical Parse Server
Behavior"):
- session-token agent → subscribe with that token; Parse Server filters events to rows the user can read.
- master-key agent (no session token, nil ACL scope) → subscribe with the master key; sees every event.
acl_user:/acl_role:agent → REFUSED. Those scopes are a mongo-direct-only construct with no REST and no LiveQuery affordance — Parse Server's LiveQuery has no "act as this user pointer / role" handshake. Bridging them would silently downgrade to either master-key (a row-level leak) or an unscoped session, so the bridge fails closed and raises SecurityError rather than open a mis-scoped channel.
Only count and samples URIs are subscribable. schema changes are not
LiveQuery events, so a parse://<Class>/schema subscribe is rejected with
ValidationError rather than silently never firing.
Defined Under Namespace
Classes: Debouncer, LocalNotifier, Manager
Constant Summary collapse
- SUBSCRIBABLE_KINDS =
Resource kinds that map to a LiveQuery-backed subscription.
schemais intentionally excluded — class-schema changes do not surface as LiveQuery row events, so a schema subscription could never fire and advertising it would be a broken contract. %w[count samples].freeze
- URI_RE =
URI grammar shared with Parse::Agent::MCPDispatcher#handle_resources_read. Captures (1) the Parse class name and (2) the resource kind.
%r{\Aparse://([A-Za-z_][A-Za-z0-9_]*)/(schema|count|samples)\z}.freeze
- DEFAULT_DEBOUNCE_INTERVAL =
Default trailing-debounce window, in seconds. A burst of LiveQuery events on the same
(session, uri)within this window collapses to a singlenotifications/resources/updated. Bounds notification fan-out on a high-churn class to at most one update per window per subscription. 0.25- DEFAULT_MAX_SUBSCRIPTIONS_PER_SESSION =
Default ceiling on concurrent subscriptions per session. A client that subscribes but never opens (or drops) its GET listening stream leaves LiveQuery subscriptions running until the session is torn down; this cap bounds that footprint, matching the "cap everything" posture of the rest of the transport (
max_concurrent_dispatchers, the pre-auth limiter). 100- DEFAULT_MAX_SESSIONS =
Default ceiling on the number of DISTINCT sessions holding subscriptions. The per-session cap above bounds one session's footprint, but nothing bounded how many sessions accumulate — an authenticated client that subscribes (which can happen before the GET stream opens) and never sends DELETE leaves the session in
@sessionsfor the process lifetime. This caps that growth, mirroring SessionOwnerRegistry::DEFAULT_MAX_ENTRIES. 10_000
Class Method Summary collapse
-
.live_query_credentials_for(agent) ⇒ Hash
Derive LiveQuery subscribe credentials from a subscribing agent.
-
.parse_subscribable_uri(uri) ⇒ Array(String, String)
Parse a resource URI into
[class_name, kind], enforcing that the kind is LiveQuery-backed.
Class Method Details
.live_query_credentials_for(agent) ⇒ Hash
Derive LiveQuery subscribe credentials from a subscribing agent.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/parse/agent/mcp_subscriptions.rb', line 121 def self.live_query_credentials_for(agent) token = agent.respond_to?(:session_token) ? agent.session_token : nil return { session_token: token } if token && !token.to_s.empty? acl_user = agent.respond_to?(:acl_user_scope) ? agent.acl_user_scope : nil acl_role = agent.respond_to?(:acl_role_scope) ? agent.acl_role_scope : nil if acl_user || acl_role raise Parse::Agent::SecurityError, "acl_user/acl_role agents cannot open a LiveQuery-backed resource subscription: " \ "Parse Server LiveQuery has no act-as-user/act-as-role handshake, so the channel " \ "would be mis-scoped. Subscribe with a session-token or master-key agent instead." end # Master-key posture: no session token and no acl_user/acl_role (both # handled above), so `acl_scope` is nil. But "no scope" is NOT by # itself authority to open an ADMIN, ACL-bypassing LiveQuery socket. # `client_for` builds that socket via # `Parse::LiveQuery::Client.new(use_master_key: true)` with no explicit # key, and the constructor backfills the PROCESS-GLOBAL master key # (`cfg.master_key || parse_client_value(:master_key)`) — a different # authority source than this agent. An unprivileged / client-mode agent # whose own client has no master key would otherwise borrow the global # one and silently elevate every row on the socket past ACL/CLP. Bind # the master-key branch to the agent's ACTUAL authority: its own client # must carry a usable (non-blank String) master key — at least as strict # as `Parse::LiveQuery::Client#admin_connection?` (which requires a # non-empty String). Fail closed otherwise. acl_scope = agent.respond_to?(:acl_scope) ? agent.acl_scope : nil if acl_scope.nil? mk = agent.respond_to?(:client) ? agent.client&.master_key : nil return { use_master_key: true } if mk.is_a?(String) && !mk.strip.empty? raise Parse::Agent::SecurityError, "master-key posture but no master key on the agent's own client; refusing to " \ "open an admin (ACL-bypassing) LiveQuery socket for an unprivileged agent — it " \ "would otherwise borrow the process-global master key. Subscribe with a " \ "session-token agent, or give this agent a master-key client." end # A scoped posture we don't have a LiveQuery mapping for — fail closed. raise Parse::Agent::SecurityError, "This agent's scope cannot be safely bridged to LiveQuery; refusing to open a " \ "resource subscription." end |
.parse_subscribable_uri(uri) ⇒ Array(String, String)
Parse a resource URI into [class_name, kind], enforcing that the kind
is LiveQuery-backed.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/parse/agent/mcp_subscriptions.rb', line 97 def self.parse_subscribable_uri(uri) match = URI_RE.match(uri.to_s) unless match raise Parse::Agent::ValidationError, "Invalid resource URI: #{uri}. Expected parse://<Class>/{count|samples}." end class_name = match[1] kind = match[2] unless SUBSCRIBABLE_KINDS.include?(kind) raise Parse::Agent::ValidationError, "Resource kind '#{kind}' is not subscribable — only #{SUBSCRIBABLE_KINDS.join(' and ')} " \ "are backed by LiveQuery. Schema changes are not LiveQuery events." end [class_name, kind] end |