Class: Parse::Agent::MCPSubscriptions::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/agent/mcp_subscriptions.rb

Overview

Per-transport subscription coordinator.

One Manager is owned by each Parse::Agent::MCPRackApp that enables resource subscriptions. It is shared across that app's requests and SSE streams, so every public method is thread-safe.

Lifecycle:

  1. A GET listening stream opens → #attach_listener registers the stream's delivery callback under its session id.
  2. A resources/subscribe POST → #subscribe validates the URI, derives credentials from the agent, and starts a LiveQuery subscription whose events publish debounced updates.
  3. resources/unsubscribe#unsubscribe stops that one LiveQuery subscription.
  4. The listening stream closes (client disconnect / DELETE session) → #detach_listener tears down every LiveQuery subscription for the session.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger: nil, debounce_interval: DEFAULT_DEBOUNCE_INTERVAL, notifier: nil, live_query_client: nil, supported: nil, timer: nil, live_query_admin_client: nil, live_query_scoped_client: nil, max_subscriptions_per_session: DEFAULT_MAX_SUBSCRIPTIONS_PER_SESSION, max_sessions: DEFAULT_MAX_SESSIONS) ⇒ Manager

Returns a new instance of Manager.

Parameters:

  • logger (#warn, nil) (defaults to: nil)
  • debounce_interval (Numeric) (defaults to: DEFAULT_DEBOUNCE_INTERVAL)

    see Debouncer.

  • notifier (#register, #unregister, #publish) (defaults to: nil)

    delivery seam. Defaults to LocalNotifier.

  • live_query_client (Object, nil) (defaults to: nil)

    a single client used for BOTH master- and session-scoped subscriptions, overriding the admin/scoped split below. Mainly a test injection point. When set, live_query_admin_client / live_query_scoped_client are ignored.

  • live_query_admin_client (Object, nil) (defaults to: nil)

    the client used for master-key-posture subscriptions. Must be an ADMIN connection (Parse::LiveQuery::Client.new(use_master_key: true)) so the socket bypasses ACL and the subscription actually sees every matching object — Parse Server has no per-subscription master key, so a non-admin connection would silently deliver only publicly-readable rows. Defaults to a lazily-constructed admin client.

  • live_query_scoped_client (Object, nil) (defaults to: nil)

    the client used for session-token subscriptions. A normal (non-admin) connection; the per-subscription session_token scopes results to that user. Defaults to the process-wide Parse::LiveQuery.client.

  • supported (Boolean, nil) (defaults to: nil)

    override the #supported? result. When nil (default), #supported? reflects the live LiveQuery enable/availability toggles. Tests pass true alongside a fake client.

  • timer (#call, nil) (defaults to: nil)

    debounce timer mechanism (see Debouncer).

  • max_subscriptions_per_session (Integer) (defaults to: DEFAULT_MAX_SUBSCRIPTIONS_PER_SESSION)

    ceiling on concurrent subscriptions for one session. #subscribe raises ValidationError past this. See DEFAULT_MAX_SUBSCRIPTIONS_PER_SESSION.

  • max_sessions (Integer) (defaults to: DEFAULT_MAX_SESSIONS)

    global ceiling on the number of distinct sessions holding subscriptions. #subscribe raises ValidationError when a NEW session would exceed it. See DEFAULT_MAX_SESSIONS.



321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/parse/agent/mcp_subscriptions.rb', line 321

def initialize(logger: nil, debounce_interval: DEFAULT_DEBOUNCE_INTERVAL,
               notifier: nil, live_query_client: nil, supported: nil,
               timer: nil,
               live_query_admin_client: nil, live_query_scoped_client: nil,
               max_subscriptions_per_session: DEFAULT_MAX_SUBSCRIPTIONS_PER_SESSION,
               max_sessions: DEFAULT_MAX_SESSIONS)
  @logger             = logger
  @debounce_interval  = debounce_interval
  @notifier           = notifier || LocalNotifier.new
  @both_client        = live_query_client
  @admin_client       = live_query_admin_client
  @scoped_client      = live_query_scoped_client
  @supported_override = supported
  @timer              = timer
  @max_per_session    = max_subscriptions_per_session
  @max_sessions       = max_sessions
  @client_mutex       = Mutex.new
  # session_id => { uri => { sub:, debouncer: } }
  @sessions           = Hash.new { |h, k| h[k] = {} }
  @mutex              = Mutex.new
end

Instance Attribute Details

#notifierObject (readonly)

Returns the value of attribute notifier.



343
344
345
# File 'lib/parse/agent/mcp_subscriptions.rb', line 343

def notifier
  @notifier
end

Instance Method Details

#attach_listener(session_id) {|notification_hash| ... }

This method returns an undefined value.

Register a listening stream's delivery callback for a session.

Parameters:

Yield Parameters:

  • notification_hash (Hash)

    JSON-RPC notification to deliver.



362
363
364
# File 'lib/parse/agent/mcp_subscriptions.rb', line 362

def attach_listener(session_id, &callback)
  @notifier.register(session_id, &callback)
end

#detach_listener(session_id) ⇒ Integer

Tear down a session: unregister its listener and stop every LiveQuery subscription it opened. Called when the listening stream closes or the session is terminated (DELETE).

Parameters:

Returns:

  • (Integer)

    number of LiveQuery subscriptions stopped.



391
392
393
394
395
396
# File 'lib/parse/agent/mcp_subscriptions.rb', line 391

def detach_listener(session_id)
  @notifier.unregister(session_id)
  subs = @mutex.synchronize { @sessions.delete(session_id) } || {}
  subs.each_value { |entry| safe_unsubscribe(entry[:sub]) }
  subs.size
end

#listener?(session_id) ⇒ Boolean

Whether a listening stream is currently attached for the session.

Returns:

  • (Boolean)


368
369
370
# File 'lib/parse/agent/mcp_subscriptions.rb', line 368

def listener?(session_id)
  @notifier.respond_to?(:listener?) ? @notifier.listener?(session_id) : false
end

#publish(session_id, message_hash) ⇒ Boolean

Push an arbitrary JSON-RPC message (notification OR a server-initiated request carrying an id, e.g. elicitation/create) onto the session's listening stream. Returns false when no stream is attached.

Parameters:

Returns:

  • (Boolean)


380
381
382
383
# File 'lib/parse/agent/mcp_subscriptions.rb', line 380

def publish(session_id, message_hash)
  return false unless @notifier.respond_to?(:publish)
  @notifier.publish(session_id, message_hash)
end

#subscribe(session_id:, uri:, agent:) ⇒ Boolean

Open a LiveQuery-backed subscription for a resource URI.

Parameters:

  • session_id (String)

    the Mcp-Session-Id keying the listener.

  • uri (String)

    parse://<Class>/{count|samples}.

  • agent (Parse::Agent)

    the subscribing agent (credential source).

Returns:

  • (Boolean)

    true on success (or already-subscribed no-op).

Raises:



408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
# File 'lib/parse/agent/mcp_subscriptions.rb', line 408

def subscribe(session_id:, uri:, agent:)
  if session_id.nil? || session_id.to_s.empty?
    raise Parse::Agent::ValidationError,
          "resources/subscribe requires an established session (Mcp-Session-Id). " \
          "Complete initialize first, then open the GET listening stream."
  end
  class_name, resource = MCPSubscriptions.parse_subscribable_uri(uri)

  # Authorization parity with the read path (resources/read →
  # agent.execute → assert_class_accessible!). Enforce agent_hidden, the
  # per-agent `classes:` allowlist, AND CLP BEFORE deriving credentials
  # or opening any socket. Parse Server LiveQuery enforces row ACL/CLP
  # for session-token subscriptions, but agent_hidden / classes: are
  # SDK-only constructs it knows nothing about — and a master-key socket
  # bypasses ACL/CLP entirely. Without this gate,
  # `resources/subscribe parse://_Session/count` (or any operator-hidden
  # PII class) becomes a change/timing oracle on a class the tool
  # surface refuses to even list. The CLP op mirrors the read path
  # exactly — `count` resources gate on `:count`, `samples` on `:find` —
  # so a subscribe is never stricter than the equivalent read. Raises
  # AccessDenied / ValidationError, which the dispatcher maps to
  # JSON-RPC -32602. Called unconditionally (not behind a
  # `defined?(Tools)` guard) so the gate fails CLOSED — if `Tools` were
  # somehow unloaded the call raises rather than silently skipping
  # authorization. `Parse::Agent::Tools` is a hard dependency of the
  # agent stack that mounts this bridge.
  op = resource == "count" ? :count : :find
  Parse::Agent::Tools.assert_class_accessible!(class_name, agent: agent, op: op)

  creds = MCPSubscriptions.live_query_credentials_for(agent)

  # Idempotent: a repeat subscribe to the same URI is a no-op rather
  # than a second LiveQuery socket subscription. Enforce the per-session
  # cap in the same critical section so a burst of distinct-URI
  # subscribes can't race past it.
  @mutex.synchronize do
    # Global cap: bound the number of DISTINCT sessions so an
    # authenticated client opening many sessions (subscribe-without-GET-
    # stream, never DELETE) can't grow `@sessions` without limit. Checked
    # BEFORE indexing `@sessions[session_id]`, which would auto-vivify the
    # entry (`Hash.new { {} }`) and defeat the size check. This is a
    # rejection cap (fails closed); the tradeoff is that a flood of orphan
    # sessions could lock out NEW sessions until they are torn down or the
    # process restarts — acceptable because every session requires a valid
    # authenticated agent and the per-session cap still bounds each one.
    if @max_sessions && !@sessions.key?(session_id) && @sessions.size >= @max_sessions
      raise Parse::Agent::ValidationError,
            "Global subscription session limit reached (#{@max_sessions}). Try again later."
    end
    subs = @sessions[session_id]
    return true if subs.key?(uri)
    if @max_per_session && subs.size >= @max_per_session
      raise Parse::Agent::ValidationError,
            "Session subscription limit reached (#{@max_per_session}). " \
            "Unsubscribe from a resource before adding another."
    end
  end

  debouncer = Debouncer.new(interval: @debounce_interval, timer: @timer) do
    publish_update(session_id, uri)
  end

  sub = client_for(creds).subscribe(class_name, **creds)
  Parse::LiveQuery::EVENTS.each do |event|
    sub.on(event) { debouncer.trigger }
  end

  # Authoritative commit under the lock. The pre-check above is only a
  # fast path — the network subscribe just ran with the lock RELEASED,
  # so in the meantime the session may have been torn down
  # (detach_listener), a racing subscribe may have claimed this URI, or
  # a concurrent burst may have pushed the session to its cap. Re-check
  # before storing, and gate on `@sessions.key?(session_id)` BEFORE
  # indexing: `@sessions` auto-vivifies (`Hash.new { {} }`), so a bare
  # `@sessions[session_id][uri] = …` would silently RESURRECT a detached
  # session and leak its LiveQuery socket for the process lifetime.
  #
  # A subscribe may legitimately arrive before the GET listening stream
  # opens (the session entry exists, just no listener yet); updates
  # published before a listener attaches are dropped by the notifier
  # and start delivering once the stream is up.
  outcome = @mutex.synchronize do
    if !@sessions.key?(session_id)
      :session_gone
    elsif @sessions[session_id].key?(uri)
      :duplicate
    elsif @max_per_session && @sessions[session_id].size >= @max_per_session
      :over_cap
    else
      @sessions[session_id][uri] = { sub: sub, debouncer: debouncer }
      :stored
    end
  end

  case outcome
  when :stored
    true
  when :duplicate
    # A concurrent subscribe to the same URI won; keep theirs and drop
    # the socket we just opened so we don't leak a duplicate.
    safe_unsubscribe(sub)
    true
  when :session_gone
    # The listening stream closed while we were subscribing — don't
    # resurrect it; tear the just-opened socket back down.
    safe_unsubscribe(sub)
    false
  when :over_cap
    safe_unsubscribe(sub)
    raise Parse::Agent::ValidationError,
          "Session subscription limit reached (#{@max_per_session}). " \
          "Unsubscribe from a resource before adding another."
  end
end

#subscription_countInteger

Returns number of active (session, uri) subscriptions.

Returns:

  • (Integer)

    number of active (session, uri) subscriptions.



539
540
541
# File 'lib/parse/agent/mcp_subscriptions.rb', line 539

def subscription_count
  @mutex.synchronize { @sessions.values.sum(&:size) }
end

#supported?Boolean

Whether this transport can honor resource subscriptions. Drives the resources.subscribe capability the dispatcher advertises — we never advertise the capability unless we can actually deliver.

Returns:

  • (Boolean)


350
351
352
353
354
355
# File 'lib/parse/agent/mcp_subscriptions.rb', line 350

def supported?
  return @supported_override unless @supported_override.nil?
  return false unless defined?(Parse::LiveQuery)
  Parse.respond_to?(:live_query_enabled?) && Parse.live_query_enabled? &&
    Parse::LiveQuery.available?
end

#unsubscribe(session_id:, uri:) ⇒ Boolean

Stop the LiveQuery subscription for one resource URI. Idempotent.

Returns:

  • (Boolean)

    true if a subscription was removed.



526
527
528
529
530
531
532
533
534
535
536
# File 'lib/parse/agent/mcp_subscriptions.rb', line 526

def unsubscribe(session_id:, uri:)
  entry = @mutex.synchronize do
    subs = @sessions[session_id]
    e = subs.delete(uri)
    @sessions.delete(session_id) if subs.empty?
    e
  end
  return false unless entry
  safe_unsubscribe(entry[:sub])
  true
end