Class: Parse::Agent::MCPRackApp

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/agent/mcp_rack_app.rb

Overview

Rack adapter that exposes Parse::Agent::MCPDispatcher as a mountable Rack endpoint. Downstream applications can mount this inside Sinatra, Rails, or any Rack-compatible router at an arbitrary path and behind their own authentication gate.

The adapter enforces the same transport-level invariants as MCPServer (method, content-type, body-size, and JSON-parse checks) and then delegates to Parse::Agent::MCPDispatcher.call for all protocol handling.

== Transport (transport: :streamable_http)

The MCP 2025-06-18 "Streamable HTTP" transport is the recommended, primary transport. Rather than toggling its constituent pieces individually (streaming: for POST→SSE, notifications: for the server→client GET / stream), pass transport: :streamable_http to enable the whole transport with one switch:

app = Parse::Agent::MCPRackApp.new(transport: :streamable_http) { |env| ... }

That is exactly equivalent to streaming: true, notifications: true. resource_subscriptions: true may still be added alongside it to upgrade the server→client bus from the plain notification posture to the LiveQuery-backed resource-subscription posture.

transport: is a closed enum — :streamable_http, :legacy, or nil. :legacy and nil both select the historical default (no streaming, no server→client stream); the standalone SSE/JSON behavior remains a supported fallback. Passing transport: :streamable_http together with an explicit streaming: or notifications: raises ArgumentError, since the switch already owns those toggles.

The default is unchanged (transport: nil): an existing MCPRackApp.new { ... } keeps its non-streaming JSON behavior. A streaming-capable Rack server (Puma, Falcon, Unicorn) is required for :streamable_http to have any effect — the WEBrick-backed MCPServer buffers responses and cannot deliver it.

== SSE Streaming (MCP progress notifications)

When constructed with streaming: true, requests that include Accept: text/event-stream receive an SSE response instead of a single JSON body. The server holds the connection open and emits notifications/progress events from two sources:

  1. Time-based heartbeats every heartbeat_interval seconds while the dispatcher runs (progress field = elapsed seconds).
  2. Tool-internal progress reported by the tool itself via agent.report_progress(progress:, total:, message:). Works for both built-in tools and custom tools registered through Parse::Agent::Tools.register.

Heartbeats are automatically suppressed once a tool reports its own progress, so the progressToken carries a single coherent stream. A final response event carries the complete JSON-RPC response, after which the stream closes.

This lets LLM clients observe progress on long-running tool calls (such as aggregate pipelines) rather than timing out silently.

Streaming requires a Rack server that supports streaming response bodies (Puma, Falcon, Unicorn). WEBrick buffers the full body before writing, so SSE streaming has no effect on the standalone MCPServer — operators using MCPServer directly should leave streaming: false (the default).

To disable Nginx response buffering for SSE endpoints, set: proxy_buffering off; or rely on the X-Accel-Buffering: no header this class emits automatically on every SSE response.

When streaming: false (default), an Accept: text/event-stream request receives a plain JSON response — the adapter is permissive per the MCP spec, which does not require SSE support.

Examples:

Block form (most common)

app = Parse::Agent::MCPRackApp.new do |env|
  token = env["HTTP_AUTHORIZATION"].to_s.delete_prefix("Bearer ")
  agent = MyAuth.agent_for_token!(token)  # raises Unauthorized if invalid
  agent
end

Keyword argument form

factory = ->(env) { Parse::Agent.new(permissions: :readonly) }
app = Parse::Agent::MCPRackApp.new(agent_factory: factory)

With SSE streaming enabled

app = Parse::Agent::MCPRackApp.new(streaming: true) { |env| ... }

Mounted in Rails routes.rb

mount Parse::Agent::MCPRackApp.new { |env| ... }, at: "/mcp"

Defined Under Namespace

Classes: CancellationRegistry, ListeningStreamBody, SSEBody, SessionOwnerRegistry

Constant Summary collapse

DEFAULT_MAX_BODY_SIZE =

Maximum allowed request body size in bytes (matches MCPServer::MAX_BODY_SIZE).

1_048_576
MAX_JSON_NESTING =

JSON nesting depth limit (matches MCPServer::MAX_JSON_NESTING).

20
DEFAULT_HEARTBEAT_INTERVAL =

Default heartbeat interval in seconds when streaming is enabled.

2
DEFAULT_MAX_CONCURRENT_DISPATCHERS =

Default bound on concurrently-active streaming dispatchers — and, separately, on concurrently-open listening streams — when the max_concurrent_dispatchers: constructor argument is omitted. Finite by default so that enabling a streaming surface (request-scoped SSE or the long-lived GET / stream) does not silently expose an unbounded orphan-thread DoS surface. The cap is applied SEPARATELY to each surface, so the effective ceiling across both is up to 2x this value. Pass an explicit nil to knowingly opt into the unbounded surface.

100
DEFAULT_APPROVAL_TIMEOUT =

Seconds to wait for a human's elicitation reply before failing closed (refusing the destructive op). Generous by default — a human-in-the-loop approver needs time the tool timeout doesn't allow. Tune via approval_timeout:.

300
JSON_CONTENT_TYPE =

Standard Content-Type for all JSON responses. Frozen template — call #json_headers to obtain a per-response mutable copy that composes with Rack middleware that decorates response headers (e.g. Sinatra's xss_header / json_csrf / common_logger).

{ "Content-Type" => "application/json" }.freeze
SSE_HEADERS =

SSE response headers. X-Accel-Buffering disables Nginx proxy buffering. Frozen template — call #sse_headers to obtain a per-response copy.

{
  "Content-Type"      => "text/event-stream",
  "Cache-Control"     => "no-cache",
  "Connection"        => "keep-alive",
  "X-Accel-Buffering" => "no",
}.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(agent_factory: nil, max_body_size: DEFAULT_MAX_BODY_SIZE, logger: nil, streaming: nil, heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL, max_concurrent_dispatchers: DEFAULT_MAX_CONCURRENT_DISPATCHERS, pre_auth_rate_limiter: nil, allowed_origins: nil, require_custom_header: nil, resource_subscriptions: false, subscription_manager: nil, notifications: nil, transport: nil, approval_timeout: DEFAULT_APPROVAL_TIMEOUT, principal_resolver: nil, health_path: nil, &block) ⇒ MCPRackApp

Returns a new instance of MCPRackApp.

Parameters:

  • agent_factory (Proc, nil) (defaults to: nil)

    callable invoked with the Rack env on every request. Must return a Parse::Agent or raise Parse::Agent::Unauthorized. Mutually exclusive with a block.

  • max_body_size (Integer) (defaults to: DEFAULT_MAX_BODY_SIZE)

    reject bodies larger than this many bytes. Defaults to DEFAULT_MAX_BODY_SIZE.

  • logger (#warn, nil) (defaults to: nil)

    optional logger. When set, auth failures are warned at class-name level, and internal errors include a backtrace.

  • streaming (Boolean) (defaults to: nil)

    enable SSE streaming for clients that send Accept: text/event-stream. Defaults to false for backward compatibility. Has no effect on WEBrick-backed deployments (see class documentation).

  • heartbeat_interval (Numeric) (defaults to: DEFAULT_HEARTBEAT_INTERVAL)

    seconds between progress heartbeat events when streaming is active. Defaults to DEFAULT_HEARTBEAT_INTERVAL. Ignored when streaming: false.

  • max_concurrent_dispatchers (Integer, nil) (defaults to: DEFAULT_MAX_CONCURRENT_DISPATCHERS)

    limits the number of concurrently active dispatcher threads across all SSE connections served by this app instance (and, separately, the number of open listening streams). When the limit is reached a new SSE request immediately receives a 503 JSON-RPC error envelope (-32000 "server busy") rather than spawning another dispatcher.

    Defaults to a finite DEFAULT_MAX_CONCURRENT_DISPATCHERS (100) — so a streaming surface is bounded out of the box rather than unbounded. Pass an explicit positive Integer to set the cap, or nil to knowingly opt into the unbounded surface (which warns at construction). A non-positive or non-integer value raises ArgumentError. Use active_dispatcher_count to monitor current concurrency from operator tooling.

  • pre_auth_rate_limiter (#check!, nil) (defaults to: nil)

    optional rate limiter consulted at the top of every request, BEFORE the agent_factory is invoked. Closes the factory-amplification DoS where each malformed request burns a Parse Server round-trip (factories typically validate session tokens by calling out). Must respond to #check! and raise an exception responding to #retry_after (such as Parse::Agent::RateLimiter::RateLimitExceeded) when exhausted. Defaults to nil (no pre-auth limiter). On exhaustion the request is rejected with HTTP 429 and a Retry-After header.

  • allowed_origins (Array<String>, nil) (defaults to: nil)

    when set, the Origin request header must match one of these entries (case-insensitive, exact host match — wildcard via leading . matches subdomains). nil (default) skips the check. Browsers always send Origin on cross-origin POST; native clients (curl, ruby HTTP client, SDK-to-SDK) typically don't, and an absent Origin is treated as allowed regardless of this setting. The default loopback bind makes this check optional in development; operators who bind MCP to a routable interface should configure it.

  • require_custom_header (String, nil) (defaults to: nil)

    when set (e.g. "X-MCP-Client"), requests must carry that header with any non-empty value. Custom headers can't be set by a <form> CSRF and force a CORS preflight on browser fetch(), so this gate closes the browser-driven attack surface entirely. Pair with allowed_origins for defense in depth.

  • health_path (String, nil) (defaults to: nil)

    when set (e.g. "/health"), GET requests to that exact path return 200 {"status":"ok"} without invoking the agent_factory, without authentication, without rate-limiting, and without applying the allowed_origins / require_custom_header CSRF gates. Intended as a liveness probe for load balancers and orchestrators (Kubernetes, ECS, Consul) that cannot present a matching Origin or custom header. Because the probe sits ahead of the pre-auth rate limiter, operators should front-edge rate-limit the path at the LB/Nginx layer if public-facing. The response intentionally contains no version, build, or counter information — fingerprint-minimal by design. nil (default) disables the endpoint entirely; empty-string values are coerced to nil. Any non-GET method on the path falls through to the standard 405 handler.

  • resource_subscriptions (Boolean) (defaults to: false)

    enable MCP resource subscriptions (resources/subscribe + notifications/resources/updated) bridged onto Parse LiveQuery. Defaults to false. When true, this app accepts a GET with Accept: text/event-stream and an Mcp-Session-Id header as a long-lived server→client listening stream, and advertises the resources.subscribe capability on initialize — but ONLY while LiveQuery is enabled (Parse.live_query_enabled = true) and available (a live_query_url is configured). Requires a streaming-capable Rack server (Puma, Falcon); WEBrick buffers responses and cannot hold the listening stream open. See docs/mcp_guide.md for the credential-scoping and single-process caveats.

  • subscription_manager (Parse::Agent::MCPSubscriptions::Manager, nil) (defaults to: nil)

    inject a pre-built manager (tests, or to share a clustered-notifier adapter). Takes precedence over resource_subscriptions:. When nil and resource_subscriptions: true, a default in-process manager is constructed.

  • transport (Symbol, nil) (defaults to: nil)

    MCP transport selector. Pass :streamable_http to enable the full MCP 2025-06-18 Streamable HTTP transport in one switch — exactly equivalent to streaming: true, notifications: true (POST→SSE plus the server→client GET / stream). resource_subscriptions: true may still be combined to upgrade the bus to its LiveQuery-backed posture. :legacy (or the default nil) selects the historical non-streaming behavior; the standalone SSE/JSON path stays a supported fallback. Any other value raises ArgumentError. Passing :streamable_http together with an explicit streaming: or notifications: also raises, since the switch already owns those toggles. Requires a streaming-capable Rack server (Puma, Falcon, Unicorn); has no effect under WEBrick.

Raises:

  • (ArgumentError)

    if both or neither of agent_factory/block are given.



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
# File 'lib/parse/agent/mcp_rack_app.rb', line 316

def initialize(agent_factory: nil, max_body_size: DEFAULT_MAX_BODY_SIZE,
               logger: nil, streaming: nil,
               heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
               max_concurrent_dispatchers: DEFAULT_MAX_CONCURRENT_DISPATCHERS,
               pre_auth_rate_limiter: nil,
               allowed_origins: nil,
               require_custom_header: nil,
               resource_subscriptions: false,
               subscription_manager: nil,
               notifications: nil,
               transport: nil,
               approval_timeout: DEFAULT_APPROVAL_TIMEOUT,
               principal_resolver: nil,
               health_path: nil, &block)
  if agent_factory && block
    raise ArgumentError, "Provide agent_factory: OR a block, not both"
  end
  unless agent_factory || block
    raise ArgumentError, "Either agent_factory: keyword or a block is required"
  end
  if pre_auth_rate_limiter && !pre_auth_rate_limiter.respond_to?(:check!)
    raise ArgumentError, "pre_auth_rate_limiter must respond to #check!"
  end

  # `transport:` is the consolidation switch over the granular
  # `streaming:` / `notifications:` toggles. `streaming` and
  # `notifications` default to nil (not false) precisely so we can tell
  # "operator left it alone" from "operator explicitly set it" and raise
  # on a conflicting combination instead of silently letting the switch
  # win. Closed enum — unknown values fail closed.
  unless transport.nil? || %i[legacy streamable_http].include?(transport)
    raise ArgumentError,
          "transport: must be :streamable_http, :legacy, or nil, got #{transport.inspect}"
  end
  if transport == :streamable_http
    unless streaming.nil? && notifications.nil?
      raise ArgumentError,
            "transport: :streamable_http already enables streaming and the server-initiated " \
            "notification stream; do not also pass streaming:/notifications: " \
            "(resource_subscriptions: may still be combined to upgrade the bus to LiveQuery)"
    end
    streaming     = true
    notifications = true
  end
  # Collapse the nil sentinel to the historical default for the
  # remainder of the constructor (and @streaming below).
  streaming     = false if streaming.nil?
  notifications = false if notifications.nil?

  @agent_factory              = agent_factory || block
  @max_body_size              = max_body_size
  @logger                     = logger
  @streaming                  = streaming
  @heartbeat_interval         = heartbeat_interval
  # The dispatcher cap defaults to the finite DEFAULT_MAX_CONCURRENT_DISPATCHERS
  # (set in the signature). An explicit positive Integer overrides it; an
  # explicit nil knowingly opts into the unbounded surface; anything else
  # is a config error and raises.
  validate_max_concurrent_dispatchers!(max_concurrent_dispatchers)
  @max_concurrent_dispatchers = max_concurrent_dispatchers
  @pre_auth_rate_limiter      = pre_auth_rate_limiter
  @allowed_origins            = normalize_allowed_origins(allowed_origins)
  @required_custom_header     = normalize_required_custom_header(require_custom_header)
  @health_path                = health_path.is_a?(String) && !health_path.empty? ? health_path : nil
  # Per-app registry of in-flight cancellable requests. Keyed by
  # [correlation_id, request_id]. A `notifications/cancelled` POST
  # whose `params.requestId` matches an entry trips the registered
  # CancellationToken. Scoped per-instance, not per-process: this
  # registry does not span multiple MCPRackApp mount points within
  # a process, nor multiple processes in a clustered deployment.
  @cancellation_registry      = CancellationRegistry.new

  # Elicitation (human-in-the-loop approval) state, shared across
  # this app's requests and its GET listening streams. The
  # capability registry records (per session) whether the client
  # advertised `elicitation` at initialize; the pending registry
  # holds server→client requests awaiting a reply. Both are cheap
  # and always present; they only do work when
  # Parse::Agent.require_approval_for opts a tier in.
  @elicitation_capabilities   = Parse::Agent::ClientCapabilityRegistry.new
  @pending_elicitations       = Parse::Agent::PendingElicitationRegistry.new
  @approval_timeout           = approval_timeout

  # Binds each MCP session id to the principal that established it so a
  # listening stream can't be hijacked by another authenticated caller.
  # Same per-instance / single-process scope as @cancellation_registry.
  @session_owners             = SessionOwnerRegistry.new
  if principal_resolver && !principal_resolver.respond_to?(:call)
    raise ArgumentError, "principal_resolver must respond to #call"
  end
  @principal_resolver         = principal_resolver

  # Listening-stream coordinator (the server→client broadcast bus
  # backing resource subscriptions, MCP elicitation, and
  # general-purpose server-initiated notifications). An injected
  # manager wins. Otherwise:
  #   - `resource_subscriptions: true` builds a LiveQuery-backed
  #     manager whose `supported?` resolves live (advertises
  #     `resources.subscribe` and serves subscribe POSTs).
  #   - `notifications: true` (without resource subscriptions) builds
  #     a manager in `supported: false` posture: the GET listening
  #     stream + `#notify` bus work, but `resources.subscribe` stays
  #     unadvertised and subscribe POSTs fail closed. This is the
  #     decoupling lever — a server can push arbitrary notifications
  #     without enabling LiveQuery resource subscriptions.
  # nil disables the GET listening stream entirely.
  @subscription_manager =
    if subscription_manager
      subscription_manager
    elsif resource_subscriptions
      Parse::Agent::MCPSubscriptions::Manager.new(logger: @logger)
    elsif notifications
      Parse::Agent::MCPSubscriptions::Manager.new(logger: @logger, supported: false)
    end

  # Warn operators who enable a streaming surface AND have explicitly
  # opted into an unbounded dispatcher cap. Both request-scoped SSE
  # (streaming:) and the long-lived GET listening stream
  # (resource_subscriptions:/notifications:, which set
  # @subscription_manager) spawn per-connection threads; an unbounded
  # endpoint is a practical DoS surface — a slow or hostile client opening
  # connections faster than they close can exhaust the host thread pool and
  # downstream Parse connection pool. The cap bounds each surface
  # SEPARATELY, so the effective ceiling is up to 2x max_concurrent_dispatchers
  # across both. The default is now the finite DEFAULT_MAX_CONCURRENT_DISPATCHERS,
  # so a nil here means the operator deliberately chose `nil` (unbounded) —
  # we warn once at construction so the choice is visible.
  if (streaming || @subscription_manager) && @max_concurrent_dispatchers.nil?
    surface = streaming ? "streaming: true" : "resource_subscriptions/notifications"
    line = "[Parse::Agent::MCPRackApp] #{surface} with an explicitly unbounded dispatcher cap " \
           "(max_concurrent_dispatchers: nil). This is an orphan-thread DoS surface. " \
           "Prefer the finite default (#{DEFAULT_MAX_CONCURRENT_DISPATCHERS}) or pass a value sized to " \
           "~2x your Puma max_threads. See docs/mcp_guide.md for sizing guidance."
    if @logger
      @logger.warn(line)
    else
      warn line
    end
  end
end

Instance Attribute Details

#subscription_managerParse::Agent::MCPSubscriptions::Manager? (readonly)

The listening-stream coordinator backing this app's server→client bus, or nil when neither resource subscriptions nor notifications are enabled. Exposed so a clustered/Redis notifier adapter or an out-of-band publisher can reach the bus directly. Direct #publish accepts arbitrary messages (notifications OR id-bearing requests); prefer #notify for the validated notification path.



464
465
466
# File 'lib/parse/agent/mcp_rack_app.rb', line 464

def subscription_manager
  @subscription_manager
end

Class Method Details

.abandoned_dispatcher_countObject

Process-wide CUMULATIVE count of GENUINE orphaned dispatchers — a client disconnect that closed the stream while the dispatcher thread was still running. Excludes already-finished-but-undelivered closes (a delivery miss, not an orphan). Unlike active_dispatcher_count / active_listening_stream_count this is a monotonic total, not a live gauge — operators alert on its rate of increase, the orphan-thread pressure signal under a disconnect-against-slow-tools storm. EVERY premature close (orphan or delivery-miss) also emits a parse.agent.mcp_dispatcher_abandoned ActiveSupport::Notifications event carrying dispatcher_alive:, so subscribers wanting the broader delivery-miss signal can filter there. Reset is not supported (counters are process-lifetime); subtract a baseline if you need a windowed delta.



536
537
538
# File 'lib/parse/agent/mcp_rack_app.rb', line 536

def self.abandoned_dispatcher_count
  @abandoned_dispatcher_mutex.synchronize { @abandoned_dispatcher_count }
end

.active_dispatcher_countObject

Returns the number of currently live dispatcher threads spawned by any SSEBody across all MCPRackApp instances in this process. Threads are counted by the :parse_mcp_dispatcher thread-local tag set when each dispatcher_thread is started. Use this for operator dashboards or health checks; do NOT use it to make flow-control decisions at runtime (use the max_concurrent_dispatchers: constructor option for that).



501
502
503
# File 'lib/parse/agent/mcp_rack_app.rb', line 501

def self.active_dispatcher_count
  Thread.list.count { |t| t[:parse_mcp_dispatcher] }
end

.active_listening_stream_countObject

Process-wide count of currently-open GET listening streams across all MCPRackApp instances. A listening stream is long-lived (the server→client notification channel) — each pins a server worker thread in #each plus a heartbeat thread — so it is bounded SEPARATELY from request-scoped SSE dispatchers (which #each, dispatch once, then close). Used as the soft cap in #serve_listening_stream. Maintained by ListeningStreamBody via adjust_listening_stream_count; unlike a Thread.list scan this is an explicit counter because the heartbeat thread is intentionally not tagged as a dispatcher.



514
515
516
# File 'lib/parse/agent/mcp_rack_app.rb', line 514

def self.active_listening_stream_count
  @listening_stream_mutex.synchronize { @listening_stream_count }
end

.adjust_listening_stream_count(delta) ⇒ Object



520
521
522
# File 'lib/parse/agent/mcp_rack_app.rb', line 520

def self.adjust_listening_stream_count(delta)
  @listening_stream_mutex.synchronize { @listening_stream_count += delta }
end

.record_abandoned_dispatcher!Object



542
543
544
# File 'lib/parse/agent/mcp_rack_app.rb', line 542

def self.record_abandoned_dispatcher!
  @abandoned_dispatcher_mutex.synchronize { @abandoned_dispatcher_count += 1 }
end

.strip_underscore_smuggled_headers!(env) ⇒ Hash

Drop env keys that would have come from underscore-form HTTP header names. The Rack-spec-compliant interpretation of HTTP headers maps X-MCP-API-Key and X_MCP_API_KEY to the same env key (HTTP_X_MCP_API_KEY); a misbehaving upstream server that forwards the underscore-form lets an attacker overwrite trusted reverse-proxy- injected headers.

This helper is invoked automatically at the top of #call, so any MCPRackApp mounted in a Rack 3+ pipeline (which exposes the original header list via rack.headers) gets defense-in-depth scrubbing without operator opt-in. On Rack 2 / pre-3 servers rack.headers is not set and the helper is a no-op; operators on those stacks must configure their upstream (e.g. Nginx underscores_in_headers off) OR mount this helper as an explicit middleware.

The standalone MCPServer rewrites its own build_rack_env to drop underscore-form names before they reach this app, so the standalone path is covered regardless of Rack version.

Examples:

Explicit middleware (Rack 2 / pre-3 deployments)

class StripSmuggledHeaders
  def initialize(app); @app = app; end
  def call(env)
    Parse::Agent::MCPRackApp.strip_underscore_smuggled_headers!(env)
    @app.call(env)
  end
end

Parameters:

  • env (Hash)

    the Rack env, mutated in place

Returns:

  • (Hash)

    the same env, for chaining



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/parse/agent/mcp_rack_app.rb', line 202

def self.strip_underscore_smuggled_headers!(env)
  # Rack 3+ preserves the original header list in env["rack.headers"]
  # (a Rack::Headers instance or Hash). When present, we can identify
  # which env keys came from an underscore-form header and delete
  # them, even if a dashed-form sibling arrived too.
  if env["rack.headers"].respond_to?(:each)
    suspect = []
    env["rack.headers"].each do |name, _|
      suspect << name if name.is_a?(String) && name.include?("_")
    end
    suspect.each do |name|
      env.delete("HTTP_#{name.upcase.tr("-", "_")}")
    end
  end
  env
end

Instance Method Details

#call(env) ⇒ Array(Integer, Hash, #each)

Rack interface.

Parameters:

  • env (Hash)

    Rack environment

Returns:

  • (Array(Integer, Hash, #each))

    Rack triple



550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
# File 'lib/parse/agent/mcp_rack_app.rb', line 550

def call(env)
  # 0. Defense-in-depth: strip underscore-form HTTP headers from env
  #    before any subsequent lookup reads HTTP_X_MCP_API_KEY / etc.
  #    No-op on Rack < 3 (where env["rack.headers"] is absent); on
  #    Rack 3+ this removes any HTTP_* env key whose original header
  #    name contained an underscore. Closes the smuggling path where
  #    a hostile client sends `X_MCP_API_Key: ...` alongside a
  #    trusted reverse-proxy-injected `X-MCP-API-Key: ...` and the
  #    underscored form collapses-and-overwrites the trusted slot.
  self.class.strip_underscore_smuggled_headers!(env)

  # 0a. Liveness probe. When `health_path:` is configured, a GET to
  #     that exact path returns `{"status":"ok"}` without auth,
  #     rate-limiting, or factory invocation. Intentionally
  #     fingerprint-minimal: no version, no build, no counter —
  #     a load balancer needs "is it up?", not "what is it?".
  if @health_path && env["PATH_INFO"] == @health_path && env["REQUEST_METHOD"] == "GET"
    return [200, json_headers, ['{"status":"ok"}']]
  end

  # 0b. NEW-MCP-6: pre-auth rate limit. Runs BEFORE the agent_factory
  #     so a malformed body / missing key / empty `{}` cannot force
  #     the operator-supplied factory to round-trip to Parse Server
  #     on every request. Off by default (constructor kwarg).
  if @pre_auth_rate_limiter
    begin
      @pre_auth_rate_limiter.check!
    rescue StandardError => e
      retry_after = e.respond_to?(:retry_after) ? e.retry_after : nil
      headers = json_headers.dup
      headers["Retry-After"] = retry_after.ceil.to_s if retry_after && retry_after > 0
      return [429, headers, [json_rpc_error(-32_000, "Too Many Requests")]]
    end
  end

  # 0c. DELETE /  — MCP 2025-06-18 Streamable HTTP session
  #     termination. A client signals it is done with a session by
  #     sending DELETE with the same `Mcp-Session-Id` header it
  #     received from initialize. Per spec the server MAY support
  #     this; if it doesn't, it MUST return 405. We support it.
  #
  #     Stateless-agent reality: the factory builds a fresh agent
  #     per request, so there is no server-side session store to
  #     evict. What DELETE meaningfully does is cancel any
  #     in-flight requests still running under that correlation_id
  #     so worker threads exit instead of completing wasted work.
  #     The cancellation_registry returns 0 when nothing matches
  #     (also the "unknown session" case) — we don't probe-leak by
  #     differentiating known vs unknown ids in the response.
  #
  #     Sanitized through Parse::Agent#correlation_id= via a
  #     throwaway agent so a malicious header value (CRLF, shell
  #     metachars) is silently rejected rather than reaching the
  #     registry as a key.
  if env["REQUEST_METHOD"] == "DELETE"
    sid = env["HTTP_MCP_SESSION_ID"].to_s
    if sid.empty?
      return [400, json_headers, [json_rpc_error(-32_600, "Missing Mcp-Session-Id")]]
    end
    clean_sid = sanitize_session_id(sid)
    if clean_sid.nil?
      return [400, json_headers, [json_rpc_error(-32_600, "Invalid Mcp-Session-Id")]]
    end
    @cancellation_registry.cancel_all_for(clean_sid, reason: :session_terminated)
    # Wake any tool thread blocked on an elicitation reply for this
    # session (it returns `unavailable` → fail closed) and drop the
    # session's cached elicitation capability.
    @pending_elicitations.abort_all_for(clean_sid, :session_terminated)
    @elicitation_capabilities.forget(clean_sid)
    # Tear down any resource subscriptions and the listening stream
    # bound to this session so a terminated session leaves no LiveQuery
    # sockets behind.
    @subscription_manager&.detach_listener(clean_sid)
    # Drop the owner binding so the id can be reclaimed after explicit
    # termination (only here — not on mere stream close, so a reconnect
    # keeps its claim).
    @session_owners.forget(clean_sid)
    return [204, json_headers, [""]]
  end

  # 0d. GET listening stream — the MCP 2025-06-18 Streamable HTTP
  #     server→client channel that carries unsolicited
  #     `notifications/resources/updated`. Only when resource
  #     subscriptions are enabled, the client opted into SSE, and a
  #     valid Mcp-Session-Id is present. Authenticated via the same
  #     agent_factory as POST: the session id is a server-issued
  #     bearer capability (returned on initialize), so possession of
  #     it plus a valid agent gates the stream.
  if env["REQUEST_METHOD"] == "GET" && @subscription_manager &&
     env["HTTP_ACCEPT"].to_s.include?("text/event-stream")
    return serve_listening_stream(env)
  end

  # 1. Method check — only POST is accepted.
  unless env["REQUEST_METHOD"] == "POST"
    return [405,
            json_headers.merge("Allow" => "POST"),
            [json_rpc_error(-32_700, "method_not_allowed")]]
  end

  # 2. Content-type check — must be application/json (charset ignored).
  content_type = env["CONTENT_TYPE"].to_s.split(";").first.to_s.strip.downcase
  unless content_type == "application/json"
    return [415, json_headers, [json_rpc_error(-32_700, "Unsupported Media Type: Content-Type must be application/json")]]
  end

  # 2b. Origin allowlist. Browsers always send an `Origin` header
  #     on cross-origin POST; native clients typically don't.
  #     When configured, a non-empty `Origin` must match the
  #     allowlist or the request is rejected with 403.
  #     Missing/empty `Origin` is allowed regardless — native
  #     clients (curl, SDK-to-SDK) shouldn't be broken by a
  #     CSRF defense aimed at browsers.
  if @allowed_origins
    origin = env["HTTP_ORIGIN"].to_s.strip
    unless origin.empty? || origin_allowed?(origin)
      @logger&.warn("[Parse::Agent::MCPRackApp] Origin refused: #{origin.inspect}")
      return [403, json_headers, [json_rpc_error(-32_700, "Origin not allowed")]]
    end
  end

  # 2c. Required custom header (CSRF defense-in-depth). A header
  #     like `X-MCP-Client` cannot be set by a `<form>` CSRF and
  #     forces a CORS preflight on browser `fetch()`. When
  #     configured, the header must be present and (if a value
  #     was supplied to the constructor) match.
  if @required_custom_header
    header_env_key, expected_value = @required_custom_header
    actual = env[header_env_key].to_s
    if actual.empty? || (expected_value && actual != expected_value)
      return [403, json_headers, [json_rpc_error(-32_700, "Required custom header missing or invalid")]]
    end
  end

  # 3. Body size limit — read one byte beyond limit to detect oversized bodies
  #    without buffering the full stream.
  raw_body = env["rack.input"].read(@max_body_size + 1)
  if raw_body.bytesize > @max_body_size
    return [413, json_headers, [json_rpc_error(-32_700, "Payload Too Large: body exceeds #{@max_body_size} bytes")]]
  end

  # 4. JSON parse.
  begin
    body = JSON.parse(raw_body.empty? ? "{}" : raw_body, max_nesting: MAX_JSON_NESTING)
  rescue JSON::ParserError, JSON::NestingError
    return [400, json_headers, [json_rpc_error(-32_700, "Parse error: invalid JSON")]]
  end

  # 4b. NEW-MCP-6: refuse obviously-malformed JSON-RPC envelopes
  #     BEFORE invoking the agent_factory. The factory typically
  #     hits Parse Server (token validation, audit logging), so a
  #     barrage of empty `{}` or missing-method bodies otherwise
  #     amplifies into a Parse Server load problem. Empty-object
  #     and missing-method requests cannot possibly be valid
  #     JSON-RPC, so we shortcut to -32600 (Invalid Request).
  #     A method-less JSON-RPC RESPONSE ({jsonrpc,id,result|error}) is
  #     NOT malformed: it is the client's reply to a server-issued
  #     elicitation/create request. Let it through here; it is routed
  #     (session-bound) after the agent_factory resolves the session.
  unless (body.is_a?(Hash) && body["method"].is_a?(String) && !body["method"].empty?) ||
         elicitation_reply?(body)
    return [400, json_headers, [json_rpc_error(-32_600, "Invalid Request")]]
  end

  # 4c. MCP-Protocol-Version header validation (MCP 2025-06-18
  #     Streamable HTTP). The spec says:
  #       - The client MUST send `MCP-Protocol-Version: <ver>`
  #         on every request AFTER initialize.
  #       - If absent on a non-initialize request, the server
  #         SHOULD assume `2025-03-26` for backwards compatibility.
  #       - If present but not a version the server supports,
  #         the server MUST respond `400 Bad Request`.
  #     Initialize requests are exempt — initialize IS the
  #     negotiation, so the header is meaningless there.
  #     Cancellation notifications are also exempt because they
  #     may be sent by a client that has not (yet) completed
  #     initialize against this transport instance (e.g. a
  #     reconnecting client cancelling a pre-disconnect request).
  unless body["method"] == "initialize" ||
         body["method"] == "notifications/cancelled" ||
         elicitation_reply?(body)
    requested = env["HTTP_MCP_PROTOCOL_VERSION"]
    if requested.is_a?(String) && !requested.empty? &&
       !Parse::Agent::MCPDispatcher::SUPPORTED_PROTOCOL_VERSIONS.include?(requested)
      return [400, json_headers,
              [json_rpc_error(-32_600,
                              "Unsupported MCP-Protocol-Version: #{requested}",
                              id: body["id"])]]
    end
  end

  # 5. Agent factory — auth gate. Rescue Unauthorized first, then catch-all
  #    for unexpected factory errors.
  begin
    agent = @agent_factory.call(env)
  rescue Parse::Agent::Unauthorized => e
    @logger.warn("[Parse::Agent::MCPRackApp] Unauthorized: #{e.class.name}") if @logger
    return [401, json_headers, [unauthorized_body]]
  rescue StandardError => e
    if @logger
      @logger.warn("[Parse::Agent::MCPRackApp] Factory error: #{e.class.name}")
      @logger.warn(e.backtrace.join("\n")) if e.backtrace
    end
    return [500, json_headers, [json_rpc_error(-32_603, "Internal error")]]
  end

  # 5a-i. Surface the silent-ungated-writes footgun. A write/admin agent
  #     served over MCP with no approval tier configured runs every
  #     destructive tool without a human gate; warn once per process so
  #     the operator notices (mirrors the unrestricted-endpoints warning).
  if agent.respond_to?(:permissions) &&
     %i[write admin].include?(agent.permissions) &&
     Parse::Agent.require_approval_for.empty?
    Parse::Agent.warn_mcp_writes_unguarded!
  end

  # 5b. Thread the conversation correlation id through. Source
  #     header: the MCP 2025-06-18 Streamable HTTP spec-canonical
  #     `Mcp-Session-Id` (Rack env key `HTTP_MCP_SESSION_ID`).
  #
  #     Only fills it when the factory hasn't already assigned one
  #     — application code that needs to override the
  #     client-supplied id (e.g., bind to an internal session
  #     record) can do so in the factory and we don't stomp on it.
  #     The Parse::Agent#correlation_id= setter sanitizes the
  #     value; an invalid header is silently dropped.
  if agent && agent.respond_to?(:correlation_id=) &&
     agent.correlation_id.nil? &&
     (sid = env["HTTP_MCP_SESSION_ID"])
    agent.correlation_id = sid
  end

  # 5b-i. Server-assigned Mcp-Session-Id on initialize. Per MCP
  #     2025-06-18 Streamable HTTP, the server SHOULD assign a
  #     fresh session id during initialize when the client did not
  #     supply one, and return it on the response so the client
  #     can echo it on subsequent requests. Stateless-agent
  #     reality: the SDK does not maintain a server-side session
  #     store — the id is best-effort correlation only (used for
  #     audit-log threading and cancellation routing). We do not
  #     refuse subsequent requests carrying an "unknown" id.
  if body.is_a?(Hash) && body["method"] == "initialize" &&
     agent && agent.respond_to?(:correlation_id=) &&
     agent.correlation_id.nil?
    agent.correlation_id = SecureRandom.uuid
  end

  # 5b-ii. Capture the client's elicitation capability at initialize.
  #     The server reads (does not advertise) the client's
  #     `capabilities.elicitation`; the approval gate consults this
  #     per session before attempting a server→client prompt.
  if body.is_a?(Hash) && body["method"] == "initialize" &&
     agent.respond_to?(:correlation_id) && agent.correlation_id
    supported = !!(body.dig("params", "capabilities", "elicitation"))
    @elicitation_capabilities.set(agent.correlation_id, supported)
    # Authoritatively bind this session to the initializing principal so
    # only the same principal can later attach a listening stream for it
    # (owner-binding; see SessionOwnerRegistry).
    @session_owners.bind(agent.correlation_id, principal_fingerprint(agent, env))
  end

  # 5b-iii. Elicitation reply ingress. A method-less JSON-RPC
  #     RESPONSE is the client's answer to a server-issued
  #     elicitation/create. Route it into the pending registry,
  #     session-bound by the same `correlation_id` the cancellation
  #     path uses, so one session can never answer another's prompt.
  #     Failures (no correlation_id, no match) are silent 202 no-ops
  #     to avoid a probe oracle — exactly like notifications/cancelled.
  if elicitation_reply?(body)
    route_elicitation_reply(agent, body)
    return [202, json_headers, [""]]
  end

  # 5c. notifications/cancelled — special-cased BEFORE the dispatcher.
  #     A JSON-RPC notification has no `id`, expects no response
  #     body, and must trip the in-flight request whose
  #     `(correlation_id, request_id)` matches. We require the
  #     cancelling request to carry the same Mcp-Session-Id
  #     (sanitized into agent.correlation_id above) as the original
  #     request — otherwise an attacker who guesses sequential
  #     JSON-RPC ids could cancel arbitrary in-flight requests.
  #
  #     Failures (no correlation_id, no requestId, no match) are
  #     silent no-ops to avoid a probe oracle. The response is
  #     always 202 Accepted with an empty body.
  if body.is_a?(Hash) && body["method"] == "notifications/cancelled"
    request_id = body.dig("params", "requestId")
    if agent.respond_to?(:correlation_id) && agent.correlation_id && request_id
      @cancellation_registry.cancel(
        agent.correlation_id,
        request_id,
        reason: :notifications_cancelled,
      )
    end
    return [202, json_headers, [""]]
  end

  # 6. Branch on streaming preference. Transport-level errors (steps 1-5)
  #    always return plain JSON regardless of the Accept header.
  if @streaming && env["HTTP_ACCEPT"].to_s.include?("text/event-stream")
    serve_sse(body, agent)
  else
    serve_json(body, agent)
  end
end

#notify(session_id, method:, params: nil) ⇒ Boolean

Push a server-initiated JSON-RPC NOTIFICATION to a session's open listening stream. This is the public front door for application code to deliver unsolicited notifications/* events (the GET stream must be open for the session — open it client-side with a GET carrying Accept: text/event-stream + Mcp-Session-Id).

The envelope is built server-side as a notification — it never carries an id, which is what distinguishes it from the server-initiated request path (e.g. elicitation/create). A caller wanting an id-bearing request uses the internal subscription_manager.publish seam, not this method.

Parameters:

  • session_id (String)

    the target session (Mcp-Session-Id).

  • method (String)

    a non-empty JSON-RPC method, e.g. "notifications/custom".

  • params (Hash, nil) (defaults to: nil)

    optional params object.

Returns:

  • (Boolean)

    true if a listening stream received it; false when notifications are disabled or no stream is attached.

Raises:

  • (ArgumentError)

    when method is blank or not a String.



485
486
487
488
489
490
491
492
493
# File 'lib/parse/agent/mcp_rack_app.rb', line 485

def notify(session_id, method:, params: nil)
  unless method.is_a?(String) && !method.empty?
    raise ArgumentError, "notify: method must be a non-empty String"
  end
  return false unless @subscription_manager
  envelope = { "jsonrpc" => "2.0", "method" => method }
  envelope["params"] = params unless params.nil?
  !!@subscription_manager.publish(session_id, envelope)
end