Class: Parse::Agent::MCPSubscriptions::Manager
- Inherits:
-
Object
- Object
- Parse::Agent::MCPSubscriptions::Manager
- Defined in:
- lib/parse/agent/mcp_subscriptions.rb
Overview
Per-transport subscription coordinator.
One Manager is owned by each Parse::Agent::MCPRackApp that enables resource subscriptions. It is shared across that app's requests and SSE streams, so every public method is thread-safe.
Lifecycle:
- A GET listening stream opens → #attach_listener registers the stream's delivery callback under its session id.
- A
resources/subscribePOST → #subscribe validates the URI, derives credentials from the agent, and starts a LiveQuery subscription whose events publish debounced updates. resources/unsubscribe→ #unsubscribe stops that one LiveQuery subscription.- The listening stream closes (client disconnect / DELETE session) → #detach_listener tears down every LiveQuery subscription for the session.
Instance Attribute Summary collapse
-
#notifier ⇒ Object
readonly
Returns the value of attribute notifier.
Instance Method Summary collapse
-
#attach_listener(session_id) {|notification_hash| ... }
Register a listening stream's delivery callback for a session.
-
#detach_listener(session_id) ⇒ Integer
Tear down a session: unregister its listener and stop every LiveQuery subscription it opened.
-
#initialize(logger: nil, debounce_interval: DEFAULT_DEBOUNCE_INTERVAL, notifier: nil, live_query_client: nil, supported: nil, timer: nil, live_query_admin_client: nil, live_query_scoped_client: nil, max_subscriptions_per_session: DEFAULT_MAX_SUBSCRIPTIONS_PER_SESSION, max_sessions: DEFAULT_MAX_SESSIONS) ⇒ Manager
constructor
A new instance of Manager.
-
#listener?(session_id) ⇒ Boolean
Whether a listening stream is currently attached for the session.
-
#publish(session_id, message_hash) ⇒ Boolean
Push an arbitrary JSON-RPC message (notification OR a server-initiated request carrying an
id, e.g.elicitation/create) onto the session's listening stream. -
#subscribe(session_id:, uri:, agent:) ⇒ Boolean
Open a LiveQuery-backed subscription for a resource URI.
-
#subscription_count ⇒ Integer
Number of active (session, uri) subscriptions.
-
#supported? ⇒ Boolean
Whether this transport can honor resource subscriptions.
-
#unsubscribe(session_id:, uri:) ⇒ Boolean
Stop the LiveQuery subscription for one resource URI.
Constructor Details
#initialize(logger: nil, debounce_interval: DEFAULT_DEBOUNCE_INTERVAL, notifier: nil, live_query_client: nil, supported: nil, timer: nil, live_query_admin_client: nil, live_query_scoped_client: nil, max_subscriptions_per_session: DEFAULT_MAX_SUBSCRIPTIONS_PER_SESSION, max_sessions: DEFAULT_MAX_SESSIONS) ⇒ Manager
Returns a new instance of Manager.
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 |
# File 'lib/parse/agent/mcp_subscriptions.rb', line 321 def initialize(logger: nil, debounce_interval: DEFAULT_DEBOUNCE_INTERVAL, notifier: nil, live_query_client: nil, supported: nil, timer: nil, live_query_admin_client: nil, live_query_scoped_client: nil, max_subscriptions_per_session: DEFAULT_MAX_SUBSCRIPTIONS_PER_SESSION, max_sessions: DEFAULT_MAX_SESSIONS) @logger = logger @debounce_interval = debounce_interval @notifier = notifier || LocalNotifier.new @both_client = live_query_client @admin_client = live_query_admin_client @scoped_client = live_query_scoped_client @supported_override = supported @timer = timer @max_per_session = max_subscriptions_per_session @max_sessions = max_sessions @client_mutex = Mutex.new # session_id => { uri => { sub:, debouncer: } } @sessions = Hash.new { |h, k| h[k] = {} } @mutex = Mutex.new end |
Instance Attribute Details
#notifier ⇒ Object (readonly)
Returns the value of attribute notifier.
343 344 345 |
# File 'lib/parse/agent/mcp_subscriptions.rb', line 343 def notifier @notifier end |
Instance Method Details
#attach_listener(session_id) {|notification_hash| ... }
This method returns an undefined value.
Register a listening stream's delivery callback for a session.
362 363 364 |
# File 'lib/parse/agent/mcp_subscriptions.rb', line 362 def attach_listener(session_id, &callback) @notifier.register(session_id, &callback) end |
#detach_listener(session_id) ⇒ Integer
Tear down a session: unregister its listener and stop every LiveQuery subscription it opened. Called when the listening stream closes or the session is terminated (DELETE).
391 392 393 394 395 396 |
# File 'lib/parse/agent/mcp_subscriptions.rb', line 391 def detach_listener(session_id) @notifier.unregister(session_id) subs = @mutex.synchronize { @sessions.delete(session_id) } || {} subs.each_value { |entry| safe_unsubscribe(entry[:sub]) } subs.size end |
#listener?(session_id) ⇒ Boolean
Whether a listening stream is currently attached for the session.
368 369 370 |
# File 'lib/parse/agent/mcp_subscriptions.rb', line 368 def listener?(session_id) @notifier.respond_to?(:listener?) ? @notifier.listener?(session_id) : false end |
#publish(session_id, message_hash) ⇒ Boolean
Push an arbitrary JSON-RPC message (notification OR a
server-initiated request carrying an id, e.g.
elicitation/create) onto the session's listening stream.
Returns false when no stream is attached.
380 381 382 383 |
# File 'lib/parse/agent/mcp_subscriptions.rb', line 380 def publish(session_id, ) return false unless @notifier.respond_to?(:publish) @notifier.publish(session_id, ) end |
#subscribe(session_id:, uri:, agent:) ⇒ Boolean
Open a LiveQuery-backed subscription for a resource URI.
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 |
# File 'lib/parse/agent/mcp_subscriptions.rb', line 408 def subscribe(session_id:, uri:, agent:) if session_id.nil? || session_id.to_s.empty? raise Parse::Agent::ValidationError, "resources/subscribe requires an established session (Mcp-Session-Id). " \ "Complete initialize first, then open the GET listening stream." end class_name, resource = MCPSubscriptions.parse_subscribable_uri(uri) # Authorization parity with the read path (resources/read → # agent.execute → assert_class_accessible!). Enforce agent_hidden, the # per-agent `classes:` allowlist, AND CLP BEFORE deriving credentials # or opening any socket. Parse Server LiveQuery enforces row ACL/CLP # for session-token subscriptions, but agent_hidden / classes: are # SDK-only constructs it knows nothing about — and a master-key socket # bypasses ACL/CLP entirely. Without this gate, # `resources/subscribe parse://_Session/count` (or any operator-hidden # PII class) becomes a change/timing oracle on a class the tool # surface refuses to even list. The CLP op mirrors the read path # exactly — `count` resources gate on `:count`, `samples` on `:find` — # so a subscribe is never stricter than the equivalent read. Raises # AccessDenied / ValidationError, which the dispatcher maps to # JSON-RPC -32602. Called unconditionally (not behind a # `defined?(Tools)` guard) so the gate fails CLOSED — if `Tools` were # somehow unloaded the call raises rather than silently skipping # authorization. `Parse::Agent::Tools` is a hard dependency of the # agent stack that mounts this bridge. op = resource == "count" ? :count : :find Parse::Agent::Tools.assert_class_accessible!(class_name, agent: agent, op: op) creds = MCPSubscriptions.live_query_credentials_for(agent) # Idempotent: a repeat subscribe to the same URI is a no-op rather # than a second LiveQuery socket subscription. Enforce the per-session # cap in the same critical section so a burst of distinct-URI # subscribes can't race past it. @mutex.synchronize do # Global cap: bound the number of DISTINCT sessions so an # authenticated client opening many sessions (subscribe-without-GET- # stream, never DELETE) can't grow `@sessions` without limit. Checked # BEFORE indexing `@sessions[session_id]`, which would auto-vivify the # entry (`Hash.new { {} }`) and defeat the size check. This is a # rejection cap (fails closed); the tradeoff is that a flood of orphan # sessions could lock out NEW sessions until they are torn down or the # process restarts — acceptable because every session requires a valid # authenticated agent and the per-session cap still bounds each one. if @max_sessions && !@sessions.key?(session_id) && @sessions.size >= @max_sessions raise Parse::Agent::ValidationError, "Global subscription session limit reached (#{@max_sessions}). Try again later." end subs = @sessions[session_id] return true if subs.key?(uri) if @max_per_session && subs.size >= @max_per_session raise Parse::Agent::ValidationError, "Session subscription limit reached (#{@max_per_session}). " \ "Unsubscribe from a resource before adding another." end end debouncer = Debouncer.new(interval: @debounce_interval, timer: @timer) do publish_update(session_id, uri) end sub = client_for(creds).subscribe(class_name, **creds) Parse::LiveQuery::EVENTS.each do |event| sub.on(event) { debouncer.trigger } end # Authoritative commit under the lock. The pre-check above is only a # fast path — the network subscribe just ran with the lock RELEASED, # so in the meantime the session may have been torn down # (detach_listener), a racing subscribe may have claimed this URI, or # a concurrent burst may have pushed the session to its cap. Re-check # before storing, and gate on `@sessions.key?(session_id)` BEFORE # indexing: `@sessions` auto-vivifies (`Hash.new { {} }`), so a bare # `@sessions[session_id][uri] = …` would silently RESURRECT a detached # session and leak its LiveQuery socket for the process lifetime. # # A subscribe may legitimately arrive before the GET listening stream # opens (the session entry exists, just no listener yet); updates # published before a listener attaches are dropped by the notifier # and start delivering once the stream is up. outcome = @mutex.synchronize do if !@sessions.key?(session_id) :session_gone elsif @sessions[session_id].key?(uri) :duplicate elsif @max_per_session && @sessions[session_id].size >= @max_per_session :over_cap else @sessions[session_id][uri] = { sub: sub, debouncer: debouncer } :stored end end case outcome when :stored true when :duplicate # A concurrent subscribe to the same URI won; keep theirs and drop # the socket we just opened so we don't leak a duplicate. safe_unsubscribe(sub) true when :session_gone # The listening stream closed while we were subscribing — don't # resurrect it; tear the just-opened socket back down. safe_unsubscribe(sub) false when :over_cap safe_unsubscribe(sub) raise Parse::Agent::ValidationError, "Session subscription limit reached (#{@max_per_session}). " \ "Unsubscribe from a resource before adding another." end end |
#subscription_count ⇒ Integer
Returns number of active (session, uri) subscriptions.
539 540 541 |
# File 'lib/parse/agent/mcp_subscriptions.rb', line 539 def subscription_count @mutex.synchronize { @sessions.values.sum(&:size) } end |
#supported? ⇒ Boolean
Whether this transport can honor resource subscriptions. Drives the
resources.subscribe capability the dispatcher advertises — we never
advertise the capability unless we can actually deliver.
350 351 352 353 354 355 |
# File 'lib/parse/agent/mcp_subscriptions.rb', line 350 def supported? return @supported_override unless @supported_override.nil? return false unless defined?(Parse::LiveQuery) Parse.respond_to?(:live_query_enabled?) && Parse.live_query_enabled? && Parse::LiveQuery.available? end |
#unsubscribe(session_id:, uri:) ⇒ Boolean
Stop the LiveQuery subscription for one resource URI. Idempotent.
526 527 528 529 530 531 532 533 534 535 536 |
# File 'lib/parse/agent/mcp_subscriptions.rb', line 526 def unsubscribe(session_id:, uri:) entry = @mutex.synchronize do subs = @sessions[session_id] e = subs.delete(uri) @sessions.delete(session_id) if subs.empty? e end return false unless entry safe_unsubscribe(entry[:sub]) true end |