Class: Parse::Agent::MCPRackApp
- Inherits:
-
Object
- Object
- Parse::Agent::MCPRackApp
- Defined in:
- lib/parse/agent/mcp_rack_app.rb
Overview
Rack adapter that exposes Parse::Agent::MCPDispatcher as a mountable Rack endpoint. Downstream applications can mount this inside Sinatra, Rails, or any Rack-compatible router at an arbitrary path and behind their own authentication gate.
The adapter enforces the same transport-level invariants as MCPServer (method, content-type, body-size, and JSON-parse checks) and then delegates to Parse::Agent::MCPDispatcher.call for all protocol handling.
== Transport (transport: :streamable_http)
The MCP 2025-06-18 "Streamable HTTP" transport is the recommended,
primary transport. Rather than toggling its constituent pieces
individually (streaming: for POST→SSE, notifications: for the
server→client GET / stream), pass transport: :streamable_http to
enable the whole transport with one switch:
app = Parse::Agent::MCPRackApp.new(transport: :streamable_http) { |env| ... }
That is exactly equivalent to streaming: true, notifications: true.
resource_subscriptions: true may still be added alongside it to
upgrade the server→client bus from the plain notification posture to
the LiveQuery-backed resource-subscription posture.
transport: is a closed enum — :streamable_http, :legacy, or nil.
:legacy and nil both select the historical default (no streaming, no
server→client stream); the standalone SSE/JSON behavior remains a
supported fallback. Passing transport: :streamable_http together with
an explicit streaming: or notifications: raises ArgumentError,
since the switch already owns those toggles.
The default is unchanged (transport: nil): an existing
MCPRackApp.new { ... } keeps its non-streaming JSON behavior. A
streaming-capable Rack server (Puma, Falcon, Unicorn) is required for
:streamable_http to have any effect — the WEBrick-backed MCPServer
buffers responses and cannot deliver it.
== SSE Streaming (MCP progress notifications)
When constructed with streaming: true, requests that include
Accept: text/event-stream receive an SSE response instead of a single
JSON body. The server holds the connection open and emits
notifications/progress events from two sources:
- Time-based heartbeats every
heartbeat_intervalseconds while the dispatcher runs (progress field = elapsed seconds). - Tool-internal progress reported by the tool itself via
agent.report_progress(progress:, total:, message:). Works for both built-in tools and custom tools registered throughParse::Agent::Tools.register.
Heartbeats are automatically suppressed once a tool reports its own
progress, so the progressToken carries a single coherent stream.
A final response event carries the complete JSON-RPC response,
after which the stream closes.
This lets LLM clients observe progress on long-running tool calls (such as aggregate pipelines) rather than timing out silently.
Streaming requires a Rack server that supports streaming response bodies
(Puma, Falcon, Unicorn). WEBrick buffers the full body before writing,
so SSE streaming has no effect on the standalone MCPServer — operators
using MCPServer directly should leave streaming: false (the default).
To disable Nginx response buffering for SSE endpoints, set:
proxy_buffering off;
or rely on the X-Accel-Buffering: no header this class emits
automatically on every SSE response.
When streaming: false (default), an Accept: text/event-stream request
receives a plain JSON response — the adapter is permissive per the MCP
spec, which does not require SSE support.
Defined Under Namespace
Classes: CancellationRegistry, ListeningStreamBody, SSEBody, SessionOwnerRegistry
Constant Summary collapse
- DEFAULT_MAX_BODY_SIZE =
Maximum allowed request body size in bytes (matches MCPServer::MAX_BODY_SIZE).
1_048_576- MAX_JSON_NESTING =
JSON nesting depth limit (matches MCPServer::MAX_JSON_NESTING).
20- DEFAULT_HEARTBEAT_INTERVAL =
Default heartbeat interval in seconds when streaming is enabled.
2- DEFAULT_MAX_CONCURRENT_DISPATCHERS =
Default bound on concurrently-active streaming dispatchers — and, separately, on concurrently-open listening streams — when the
max_concurrent_dispatchers:constructor argument is omitted. Finite by default so that enabling a streaming surface (request-scoped SSE or the long-livedGET /stream) does not silently expose an unbounded orphan-thread DoS surface. The cap is applied SEPARATELY to each surface, so the effective ceiling across both is up to 2x this value. Pass an explicitnilto knowingly opt into the unbounded surface. 100- DEFAULT_APPROVAL_TIMEOUT =
Seconds to wait for a human's elicitation reply before failing closed (refusing the destructive op). Generous by default — a human-in-the-loop approver needs time the tool timeout doesn't allow. Tune via
approval_timeout:. 300- JSON_CONTENT_TYPE =
Standard Content-Type for all JSON responses. Frozen template — call #json_headers to obtain a per-response mutable copy that composes with Rack middleware that decorates response headers (e.g. Sinatra's xss_header / json_csrf / common_logger).
{ "Content-Type" => "application/json" }.freeze
- SSE_HEADERS =
SSE response headers. X-Accel-Buffering disables Nginx proxy buffering. Frozen template — call #sse_headers to obtain a per-response copy.
{ "Content-Type" => "text/event-stream", "Cache-Control" => "no-cache", "Connection" => "keep-alive", "X-Accel-Buffering" => "no", }.freeze
Instance Attribute Summary collapse
-
#subscription_manager ⇒ Parse::Agent::MCPSubscriptions::Manager?
readonly
The listening-stream coordinator backing this app's server→client bus, or nil when neither resource subscriptions nor notifications are enabled.
Class Method Summary collapse
-
.abandoned_dispatcher_count ⇒ Object
Process-wide CUMULATIVE count of GENUINE orphaned dispatchers — a client disconnect that closed the stream while the dispatcher thread was still running.
-
.active_dispatcher_count ⇒ Object
Returns the number of currently live dispatcher threads spawned by any SSEBody across all MCPRackApp instances in this process.
-
.active_listening_stream_count ⇒ Object
Process-wide count of currently-open GET listening streams across all MCPRackApp instances.
- .adjust_listening_stream_count(delta) ⇒ Object
- .record_abandoned_dispatcher! ⇒ Object
-
.strip_underscore_smuggled_headers!(env) ⇒ Hash
Drop env keys that would have come from underscore-form HTTP header names.
Instance Method Summary collapse
-
#call(env) ⇒ Array(Integer, Hash, #each)
Rack interface.
-
#initialize(agent_factory: nil, max_body_size: DEFAULT_MAX_BODY_SIZE, logger: nil, streaming: nil, heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL, max_concurrent_dispatchers: DEFAULT_MAX_CONCURRENT_DISPATCHERS, pre_auth_rate_limiter: nil, allowed_origins: nil, require_custom_header: nil, resource_subscriptions: false, subscription_manager: nil, notifications: nil, transport: nil, approval_timeout: DEFAULT_APPROVAL_TIMEOUT, principal_resolver: nil, health_path: nil, &block) ⇒ MCPRackApp
constructor
A new instance of MCPRackApp.
-
#notify(session_id, method:, params: nil) ⇒ Boolean
Push a server-initiated JSON-RPC NOTIFICATION to a session's open listening stream.
Constructor Details
#initialize(agent_factory: nil, max_body_size: DEFAULT_MAX_BODY_SIZE, logger: nil, streaming: nil, heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL, max_concurrent_dispatchers: DEFAULT_MAX_CONCURRENT_DISPATCHERS, pre_auth_rate_limiter: nil, allowed_origins: nil, require_custom_header: nil, resource_subscriptions: false, subscription_manager: nil, notifications: nil, transport: nil, approval_timeout: DEFAULT_APPROVAL_TIMEOUT, principal_resolver: nil, health_path: nil, &block) ⇒ MCPRackApp
Returns a new instance of MCPRackApp.
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 316 def initialize(agent_factory: nil, max_body_size: DEFAULT_MAX_BODY_SIZE, logger: nil, streaming: nil, heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL, max_concurrent_dispatchers: DEFAULT_MAX_CONCURRENT_DISPATCHERS, pre_auth_rate_limiter: nil, allowed_origins: nil, require_custom_header: nil, resource_subscriptions: false, subscription_manager: nil, notifications: nil, transport: nil, approval_timeout: DEFAULT_APPROVAL_TIMEOUT, principal_resolver: nil, health_path: nil, &block) if agent_factory && block raise ArgumentError, "Provide agent_factory: OR a block, not both" end unless agent_factory || block raise ArgumentError, "Either agent_factory: keyword or a block is required" end if pre_auth_rate_limiter && !pre_auth_rate_limiter.respond_to?(:check!) raise ArgumentError, "pre_auth_rate_limiter must respond to #check!" end # `transport:` is the consolidation switch over the granular # `streaming:` / `notifications:` toggles. `streaming` and # `notifications` default to nil (not false) precisely so we can tell # "operator left it alone" from "operator explicitly set it" and raise # on a conflicting combination instead of silently letting the switch # win. Closed enum — unknown values fail closed. unless transport.nil? || %i[legacy streamable_http].include?(transport) raise ArgumentError, "transport: must be :streamable_http, :legacy, or nil, got #{transport.inspect}" end if transport == :streamable_http unless streaming.nil? && notifications.nil? raise ArgumentError, "transport: :streamable_http already enables streaming and the server-initiated " \ "notification stream; do not also pass streaming:/notifications: " \ "(resource_subscriptions: may still be combined to upgrade the bus to LiveQuery)" end streaming = true notifications = true end # Collapse the nil sentinel to the historical default for the # remainder of the constructor (and @streaming below). streaming = false if streaming.nil? notifications = false if notifications.nil? @agent_factory = agent_factory || block @max_body_size = max_body_size @logger = logger @streaming = streaming @heartbeat_interval = heartbeat_interval # The dispatcher cap defaults to the finite DEFAULT_MAX_CONCURRENT_DISPATCHERS # (set in the signature). An explicit positive Integer overrides it; an # explicit nil knowingly opts into the unbounded surface; anything else # is a config error and raises. validate_max_concurrent_dispatchers!(max_concurrent_dispatchers) @max_concurrent_dispatchers = max_concurrent_dispatchers @pre_auth_rate_limiter = pre_auth_rate_limiter @allowed_origins = normalize_allowed_origins(allowed_origins) @required_custom_header = normalize_required_custom_header(require_custom_header) @health_path = health_path.is_a?(String) && !health_path.empty? ? health_path : nil # Per-app registry of in-flight cancellable requests. Keyed by # [correlation_id, request_id]. A `notifications/cancelled` POST # whose `params.requestId` matches an entry trips the registered # CancellationToken. Scoped per-instance, not per-process: this # registry does not span multiple MCPRackApp mount points within # a process, nor multiple processes in a clustered deployment. @cancellation_registry = CancellationRegistry.new # Elicitation (human-in-the-loop approval) state, shared across # this app's requests and its GET listening streams. The # capability registry records (per session) whether the client # advertised `elicitation` at initialize; the pending registry # holds server→client requests awaiting a reply. Both are cheap # and always present; they only do work when # Parse::Agent.require_approval_for opts a tier in. @elicitation_capabilities = Parse::Agent::ClientCapabilityRegistry.new @pending_elicitations = Parse::Agent::PendingElicitationRegistry.new @approval_timeout = approval_timeout # Binds each MCP session id to the principal that established it so a # listening stream can't be hijacked by another authenticated caller. # Same per-instance / single-process scope as @cancellation_registry. @session_owners = SessionOwnerRegistry.new if principal_resolver && !principal_resolver.respond_to?(:call) raise ArgumentError, "principal_resolver must respond to #call" end @principal_resolver = principal_resolver # Listening-stream coordinator (the server→client broadcast bus # backing resource subscriptions, MCP elicitation, and # general-purpose server-initiated notifications). An injected # manager wins. Otherwise: # - `resource_subscriptions: true` builds a LiveQuery-backed # manager whose `supported?` resolves live (advertises # `resources.subscribe` and serves subscribe POSTs). # - `notifications: true` (without resource subscriptions) builds # a manager in `supported: false` posture: the GET listening # stream + `#notify` bus work, but `resources.subscribe` stays # unadvertised and subscribe POSTs fail closed. This is the # decoupling lever — a server can push arbitrary notifications # without enabling LiveQuery resource subscriptions. # nil disables the GET listening stream entirely. @subscription_manager = if subscription_manager subscription_manager elsif resource_subscriptions Parse::Agent::MCPSubscriptions::Manager.new(logger: @logger) elsif notifications Parse::Agent::MCPSubscriptions::Manager.new(logger: @logger, supported: false) end # Warn operators who enable a streaming surface AND have explicitly # opted into an unbounded dispatcher cap. Both request-scoped SSE # (streaming:) and the long-lived GET listening stream # (resource_subscriptions:/notifications:, which set # @subscription_manager) spawn per-connection threads; an unbounded # endpoint is a practical DoS surface — a slow or hostile client opening # connections faster than they close can exhaust the host thread pool and # downstream Parse connection pool. The cap bounds each surface # SEPARATELY, so the effective ceiling is up to 2x max_concurrent_dispatchers # across both. The default is now the finite DEFAULT_MAX_CONCURRENT_DISPATCHERS, # so a nil here means the operator deliberately chose `nil` (unbounded) — # we warn once at construction so the choice is visible. if (streaming || @subscription_manager) && @max_concurrent_dispatchers.nil? surface = streaming ? "streaming: true" : "resource_subscriptions/notifications" line = "[Parse::Agent::MCPRackApp] #{surface} with an explicitly unbounded dispatcher cap " \ "(max_concurrent_dispatchers: nil). This is an orphan-thread DoS surface. " \ "Prefer the finite default (#{DEFAULT_MAX_CONCURRENT_DISPATCHERS}) or pass a value sized to " \ "~2x your Puma max_threads. See docs/mcp_guide.md for sizing guidance." if @logger @logger.warn(line) else warn line end end end |
Instance Attribute Details
#subscription_manager ⇒ Parse::Agent::MCPSubscriptions::Manager? (readonly)
The listening-stream coordinator backing this app's server→client
bus, or nil when neither resource subscriptions nor notifications
are enabled. Exposed so a clustered/Redis notifier adapter or an
out-of-band publisher can reach the bus directly. Direct
#publish accepts arbitrary messages (notifications OR id-bearing
requests); prefer #notify for the validated notification path.
464 465 466 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 464 def subscription_manager @subscription_manager end |
Class Method Details
.abandoned_dispatcher_count ⇒ Object
Process-wide CUMULATIVE count of GENUINE orphaned dispatchers — a client
disconnect that closed the stream while the dispatcher thread was still
running. Excludes already-finished-but-undelivered closes (a delivery
miss, not an orphan). Unlike active_dispatcher_count /
active_listening_stream_count this is a monotonic total, not a live
gauge — operators alert on its rate of increase, the orphan-thread
pressure signal under a disconnect-against-slow-tools storm. EVERY
premature close (orphan or delivery-miss) also emits a
parse.agent.mcp_dispatcher_abandoned ActiveSupport::Notifications event
carrying dispatcher_alive:, so subscribers wanting the broader
delivery-miss signal can filter there. Reset is not supported (counters
are process-lifetime); subtract a baseline if you need a windowed delta.
536 537 538 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 536 def self.abandoned_dispatcher_count @abandoned_dispatcher_mutex.synchronize { @abandoned_dispatcher_count } end |
.active_dispatcher_count ⇒ Object
Returns the number of currently live dispatcher threads spawned by any
SSEBody across all MCPRackApp instances in this process. Threads are
counted by the :parse_mcp_dispatcher thread-local tag set when each
dispatcher_thread is started. Use this for operator dashboards or health
checks; do NOT use it to make flow-control decisions at runtime (use
the max_concurrent_dispatchers: constructor option for that).
501 502 503 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 501 def self.active_dispatcher_count Thread.list.count { |t| t[:parse_mcp_dispatcher] } end |
.active_listening_stream_count ⇒ Object
Process-wide count of currently-open GET listening streams across all MCPRackApp instances. A listening stream is long-lived (the server→client notification channel) — each pins a server worker thread in #each plus a heartbeat thread — so it is bounded SEPARATELY from request-scoped SSE dispatchers (which #each, dispatch once, then close). Used as the soft cap in #serve_listening_stream. Maintained by ListeningStreamBody via adjust_listening_stream_count; unlike a Thread.list scan this is an explicit counter because the heartbeat thread is intentionally not tagged as a dispatcher.
514 515 516 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 514 def self.active_listening_stream_count @listening_stream_mutex.synchronize { @listening_stream_count } end |
.adjust_listening_stream_count(delta) ⇒ Object
520 521 522 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 520 def self.adjust_listening_stream_count(delta) @listening_stream_mutex.synchronize { @listening_stream_count += delta } end |
.record_abandoned_dispatcher! ⇒ Object
542 543 544 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 542 def self.record_abandoned_dispatcher! @abandoned_dispatcher_mutex.synchronize { @abandoned_dispatcher_count += 1 } end |
.strip_underscore_smuggled_headers!(env) ⇒ Hash
Drop env keys that would have come from underscore-form HTTP header
names. The Rack-spec-compliant interpretation of HTTP headers maps
X-MCP-API-Key and X_MCP_API_KEY to the same env key
(HTTP_X_MCP_API_KEY); a misbehaving upstream server that forwards
the underscore-form lets an attacker overwrite trusted reverse-proxy-
injected headers.
This helper is invoked automatically at the top of #call, so any
MCPRackApp mounted in a Rack 3+ pipeline (which exposes the original
header list via rack.headers) gets defense-in-depth scrubbing
without operator opt-in. On Rack 2 / pre-3 servers rack.headers is
not set and the helper is a no-op; operators on those stacks must
configure their upstream (e.g. Nginx underscores_in_headers off)
OR mount this helper as an explicit middleware.
The standalone MCPServer rewrites its own build_rack_env to drop
underscore-form names before they reach this app, so the standalone
path is covered regardless of Rack version.
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 202 def self.strip_underscore_smuggled_headers!(env) # Rack 3+ preserves the original header list in env["rack.headers"] # (a Rack::Headers instance or Hash). When present, we can identify # which env keys came from an underscore-form header and delete # them, even if a dashed-form sibling arrived too. if env["rack.headers"].respond_to?(:each) suspect = [] env["rack.headers"].each do |name, _| suspect << name if name.is_a?(String) && name.include?("_") end suspect.each do |name| env.delete("HTTP_#{name.upcase.tr("-", "_")}") end end env end |
Instance Method Details
#call(env) ⇒ Array(Integer, Hash, #each)
Rack interface.
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 550 def call(env) # 0. Defense-in-depth: strip underscore-form HTTP headers from env # before any subsequent lookup reads HTTP_X_MCP_API_KEY / etc. # No-op on Rack < 3 (where env["rack.headers"] is absent); on # Rack 3+ this removes any HTTP_* env key whose original header # name contained an underscore. Closes the smuggling path where # a hostile client sends `X_MCP_API_Key: ...` alongside a # trusted reverse-proxy-injected `X-MCP-API-Key: ...` and the # underscored form collapses-and-overwrites the trusted slot. self.class.strip_underscore_smuggled_headers!(env) # 0a. Liveness probe. When `health_path:` is configured, a GET to # that exact path returns `{"status":"ok"}` without auth, # rate-limiting, or factory invocation. Intentionally # fingerprint-minimal: no version, no build, no counter — # a load balancer needs "is it up?", not "what is it?". if @health_path && env["PATH_INFO"] == @health_path && env["REQUEST_METHOD"] == "GET" return [200, json_headers, ['{"status":"ok"}']] end # 0b. NEW-MCP-6: pre-auth rate limit. Runs BEFORE the agent_factory # so a malformed body / missing key / empty `{}` cannot force # the operator-supplied factory to round-trip to Parse Server # on every request. Off by default (constructor kwarg). if @pre_auth_rate_limiter begin @pre_auth_rate_limiter.check! rescue StandardError => e retry_after = e.respond_to?(:retry_after) ? e.retry_after : nil headers = json_headers.dup headers["Retry-After"] = retry_after.ceil.to_s if retry_after && retry_after > 0 return [429, headers, [json_rpc_error(-32_000, "Too Many Requests")]] end end # 0c. DELETE / — MCP 2025-06-18 Streamable HTTP session # termination. A client signals it is done with a session by # sending DELETE with the same `Mcp-Session-Id` header it # received from initialize. Per spec the server MAY support # this; if it doesn't, it MUST return 405. We support it. # # Stateless-agent reality: the factory builds a fresh agent # per request, so there is no server-side session store to # evict. What DELETE meaningfully does is cancel any # in-flight requests still running under that correlation_id # so worker threads exit instead of completing wasted work. # The cancellation_registry returns 0 when nothing matches # (also the "unknown session" case) — we don't probe-leak by # differentiating known vs unknown ids in the response. # # Sanitized through Parse::Agent#correlation_id= via a # throwaway agent so a malicious header value (CRLF, shell # metachars) is silently rejected rather than reaching the # registry as a key. if env["REQUEST_METHOD"] == "DELETE" sid = env["HTTP_MCP_SESSION_ID"].to_s if sid.empty? return [400, json_headers, [json_rpc_error(-32_600, "Missing Mcp-Session-Id")]] end clean_sid = sanitize_session_id(sid) if clean_sid.nil? return [400, json_headers, [json_rpc_error(-32_600, "Invalid Mcp-Session-Id")]] end @cancellation_registry.cancel_all_for(clean_sid, reason: :session_terminated) # Wake any tool thread blocked on an elicitation reply for this # session (it returns `unavailable` → fail closed) and drop the # session's cached elicitation capability. @pending_elicitations.abort_all_for(clean_sid, :session_terminated) @elicitation_capabilities.forget(clean_sid) # Tear down any resource subscriptions and the listening stream # bound to this session so a terminated session leaves no LiveQuery # sockets behind. @subscription_manager&.detach_listener(clean_sid) # Drop the owner binding so the id can be reclaimed after explicit # termination (only here — not on mere stream close, so a reconnect # keeps its claim). @session_owners.forget(clean_sid) return [204, json_headers, [""]] end # 0d. GET listening stream — the MCP 2025-06-18 Streamable HTTP # server→client channel that carries unsolicited # `notifications/resources/updated`. Only when resource # subscriptions are enabled, the client opted into SSE, and a # valid Mcp-Session-Id is present. Authenticated via the same # agent_factory as POST: the session id is a server-issued # bearer capability (returned on initialize), so possession of # it plus a valid agent gates the stream. if env["REQUEST_METHOD"] == "GET" && @subscription_manager && env["HTTP_ACCEPT"].to_s.include?("text/event-stream") return serve_listening_stream(env) end # 1. Method check — only POST is accepted. unless env["REQUEST_METHOD"] == "POST" return [405, json_headers.merge("Allow" => "POST"), [json_rpc_error(-32_700, "method_not_allowed")]] end # 2. Content-type check — must be application/json (charset ignored). content_type = env["CONTENT_TYPE"].to_s.split(";").first.to_s.strip.downcase unless content_type == "application/json" return [415, json_headers, [json_rpc_error(-32_700, "Unsupported Media Type: Content-Type must be application/json")]] end # 2b. Origin allowlist. Browsers always send an `Origin` header # on cross-origin POST; native clients typically don't. # When configured, a non-empty `Origin` must match the # allowlist or the request is rejected with 403. # Missing/empty `Origin` is allowed regardless — native # clients (curl, SDK-to-SDK) shouldn't be broken by a # CSRF defense aimed at browsers. if @allowed_origins origin = env["HTTP_ORIGIN"].to_s.strip unless origin.empty? || origin_allowed?(origin) @logger&.warn("[Parse::Agent::MCPRackApp] Origin refused: #{origin.inspect}") return [403, json_headers, [json_rpc_error(-32_700, "Origin not allowed")]] end end # 2c. Required custom header (CSRF defense-in-depth). A header # like `X-MCP-Client` cannot be set by a `<form>` CSRF and # forces a CORS preflight on browser `fetch()`. When # configured, the header must be present and (if a value # was supplied to the constructor) match. if @required_custom_header header_env_key, expected_value = @required_custom_header actual = env[header_env_key].to_s if actual.empty? || (expected_value && actual != expected_value) return [403, json_headers, [json_rpc_error(-32_700, "Required custom header missing or invalid")]] end end # 3. Body size limit — read one byte beyond limit to detect oversized bodies # without buffering the full stream. raw_body = env["rack.input"].read(@max_body_size + 1) if raw_body.bytesize > @max_body_size return [413, json_headers, [json_rpc_error(-32_700, "Payload Too Large: body exceeds #{@max_body_size} bytes")]] end # 4. JSON parse. begin body = JSON.parse(raw_body.empty? ? "{}" : raw_body, max_nesting: MAX_JSON_NESTING) rescue JSON::ParserError, JSON::NestingError return [400, json_headers, [json_rpc_error(-32_700, "Parse error: invalid JSON")]] end # 4b. NEW-MCP-6: refuse obviously-malformed JSON-RPC envelopes # BEFORE invoking the agent_factory. The factory typically # hits Parse Server (token validation, audit logging), so a # barrage of empty `{}` or missing-method bodies otherwise # amplifies into a Parse Server load problem. Empty-object # and missing-method requests cannot possibly be valid # JSON-RPC, so we shortcut to -32600 (Invalid Request). # A method-less JSON-RPC RESPONSE ({jsonrpc,id,result|error}) is # NOT malformed: it is the client's reply to a server-issued # elicitation/create request. Let it through here; it is routed # (session-bound) after the agent_factory resolves the session. unless (body.is_a?(Hash) && body["method"].is_a?(String) && !body["method"].empty?) || elicitation_reply?(body) return [400, json_headers, [json_rpc_error(-32_600, "Invalid Request")]] end # 4c. MCP-Protocol-Version header validation (MCP 2025-06-18 # Streamable HTTP). The spec says: # - The client MUST send `MCP-Protocol-Version: <ver>` # on every request AFTER initialize. # - If absent on a non-initialize request, the server # SHOULD assume `2025-03-26` for backwards compatibility. # - If present but not a version the server supports, # the server MUST respond `400 Bad Request`. # Initialize requests are exempt — initialize IS the # negotiation, so the header is meaningless there. # Cancellation notifications are also exempt because they # may be sent by a client that has not (yet) completed # initialize against this transport instance (e.g. a # reconnecting client cancelling a pre-disconnect request). unless body["method"] == "initialize" || body["method"] == "notifications/cancelled" || elicitation_reply?(body) requested = env["HTTP_MCP_PROTOCOL_VERSION"] if requested.is_a?(String) && !requested.empty? && !Parse::Agent::MCPDispatcher::SUPPORTED_PROTOCOL_VERSIONS.include?(requested) return [400, json_headers, [json_rpc_error(-32_600, "Unsupported MCP-Protocol-Version: #{requested}", id: body["id"])]] end end # 5. Agent factory — auth gate. Rescue Unauthorized first, then catch-all # for unexpected factory errors. begin agent = @agent_factory.call(env) rescue Parse::Agent::Unauthorized => e @logger.warn("[Parse::Agent::MCPRackApp] Unauthorized: #{e.class.name}") if @logger return [401, json_headers, []] rescue StandardError => e if @logger @logger.warn("[Parse::Agent::MCPRackApp] Factory error: #{e.class.name}") @logger.warn(e.backtrace.join("\n")) if e.backtrace end return [500, json_headers, [json_rpc_error(-32_603, "Internal error")]] end # 5a-i. Surface the silent-ungated-writes footgun. A write/admin agent # served over MCP with no approval tier configured runs every # destructive tool without a human gate; warn once per process so # the operator notices (mirrors the unrestricted-endpoints warning). if agent.respond_to?(:permissions) && %i[write admin].include?(agent.) && Parse::Agent.require_approval_for.empty? Parse::Agent.warn_mcp_writes_unguarded! end # 5b. Thread the conversation correlation id through. Source # header: the MCP 2025-06-18 Streamable HTTP spec-canonical # `Mcp-Session-Id` (Rack env key `HTTP_MCP_SESSION_ID`). # # Only fills it when the factory hasn't already assigned one # — application code that needs to override the # client-supplied id (e.g., bind to an internal session # record) can do so in the factory and we don't stomp on it. # The Parse::Agent#correlation_id= setter sanitizes the # value; an invalid header is silently dropped. if agent && agent.respond_to?(:correlation_id=) && agent.correlation_id.nil? && (sid = env["HTTP_MCP_SESSION_ID"]) agent.correlation_id = sid end # 5b-i. Server-assigned Mcp-Session-Id on initialize. Per MCP # 2025-06-18 Streamable HTTP, the server SHOULD assign a # fresh session id during initialize when the client did not # supply one, and return it on the response so the client # can echo it on subsequent requests. Stateless-agent # reality: the SDK does not maintain a server-side session # store — the id is best-effort correlation only (used for # audit-log threading and cancellation routing). We do not # refuse subsequent requests carrying an "unknown" id. if body.is_a?(Hash) && body["method"] == "initialize" && agent && agent.respond_to?(:correlation_id=) && agent.correlation_id.nil? agent.correlation_id = SecureRandom.uuid end # 5b-ii. Capture the client's elicitation capability at initialize. # The server reads (does not advertise) the client's # `capabilities.elicitation`; the approval gate consults this # per session before attempting a server→client prompt. if body.is_a?(Hash) && body["method"] == "initialize" && agent.respond_to?(:correlation_id) && agent.correlation_id supported = !!(body.dig("params", "capabilities", "elicitation")) @elicitation_capabilities.set(agent.correlation_id, supported) # Authoritatively bind this session to the initializing principal so # only the same principal can later attach a listening stream for it # (owner-binding; see SessionOwnerRegistry). @session_owners.bind(agent.correlation_id, principal_fingerprint(agent, env)) end # 5b-iii. Elicitation reply ingress. A method-less JSON-RPC # RESPONSE is the client's answer to a server-issued # elicitation/create. Route it into the pending registry, # session-bound by the same `correlation_id` the cancellation # path uses, so one session can never answer another's prompt. # Failures (no correlation_id, no match) are silent 202 no-ops # to avoid a probe oracle — exactly like notifications/cancelled. if elicitation_reply?(body) route_elicitation_reply(agent, body) return [202, json_headers, [""]] end # 5c. notifications/cancelled — special-cased BEFORE the dispatcher. # A JSON-RPC notification has no `id`, expects no response # body, and must trip the in-flight request whose # `(correlation_id, request_id)` matches. We require the # cancelling request to carry the same Mcp-Session-Id # (sanitized into agent.correlation_id above) as the original # request — otherwise an attacker who guesses sequential # JSON-RPC ids could cancel arbitrary in-flight requests. # # Failures (no correlation_id, no requestId, no match) are # silent no-ops to avoid a probe oracle. The response is # always 202 Accepted with an empty body. if body.is_a?(Hash) && body["method"] == "notifications/cancelled" request_id = body.dig("params", "requestId") if agent.respond_to?(:correlation_id) && agent.correlation_id && request_id @cancellation_registry.cancel( agent.correlation_id, request_id, reason: :notifications_cancelled, ) end return [202, json_headers, [""]] end # 6. Branch on streaming preference. Transport-level errors (steps 1-5) # always return plain JSON regardless of the Accept header. if @streaming && env["HTTP_ACCEPT"].to_s.include?("text/event-stream") serve_sse(body, agent) else serve_json(body, agent) end end |
#notify(session_id, method:, params: nil) ⇒ Boolean
Push a server-initiated JSON-RPC NOTIFICATION to a session's open
listening stream. This is the public front door for application
code to deliver unsolicited notifications/* events (the GET
stream must be open for the session — open it client-side with a
GET carrying Accept: text/event-stream + Mcp-Session-Id).
The envelope is built server-side as a notification — it never
carries an id, which is what distinguishes it from the
server-initiated request path (e.g. elicitation/create). A
caller wanting an id-bearing request uses the internal
subscription_manager.publish seam, not this method.
485 486 487 488 489 490 491 492 493 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 485 def notify(session_id, method:, params: nil) unless method.is_a?(String) && !method.empty? raise ArgumentError, "notify: method must be a non-empty String" end return false unless @subscription_manager envelope = { "jsonrpc" => "2.0", "method" => method } envelope["params"] = params unless params.nil? !!@subscription_manager.publish(session_id, envelope) end |