Class: Parse::Agent::MCPRackApp
- Inherits:
-
Object
- Object
- Parse::Agent::MCPRackApp
- Defined in:
- lib/parse/agent/mcp_rack_app.rb
Overview
Rack adapter that exposes Parse::Agent::MCPDispatcher as a mountable Rack endpoint. Downstream applications can mount this inside Sinatra, Rails, or any Rack-compatible router at an arbitrary path and behind their own authentication gate.
The adapter enforces the same transport-level invariants as MCPServer (method, content-type, body-size, and JSON-parse checks) and then delegates to Parse::Agent::MCPDispatcher.call for all protocol handling.
== SSE Streaming (MCP progress notifications)
When constructed with streaming: true, requests that include
Accept: text/event-stream receive an SSE response instead of a single
JSON body. The server holds the connection open and emits
notifications/progress events from two sources:
- Time-based heartbeats every
heartbeat_intervalseconds while the dispatcher runs (progress field = elapsed seconds). - Tool-internal progress reported by the tool itself via
agent.report_progress(progress:, total:, message:). Works for both built-in tools and custom tools registered throughParse::Agent::Tools.register.
Heartbeats are automatically suppressed once a tool reports its own
progress, so the progressToken carries a single coherent stream.
A final response event carries the complete JSON-RPC response,
after which the stream closes.
This lets LLM clients observe progress on long-running tool calls (such as aggregate pipelines) rather than timing out silently.
Streaming requires a Rack server that supports streaming response bodies
(Puma, Falcon, Unicorn). WEBrick buffers the full body before writing,
so SSE streaming has no effect on the standalone MCPServer — operators
using MCPServer directly should leave streaming: false (the default).
To disable Nginx response buffering for SSE endpoints, set:
proxy_buffering off;
or rely on the X-Accel-Buffering: no header this class emits
automatically on every SSE response.
When streaming: false (default), an Accept: text/event-stream request
receives a plain JSON response — the adapter is permissive per the MCP
spec, which does not require SSE support.
Defined Under Namespace
Classes: CancellationRegistry, ListeningStreamBody, SSEBody, SessionOwnerRegistry
Constant Summary collapse
- DEFAULT_MAX_BODY_SIZE =
Maximum allowed request body size in bytes (matches MCPServer::MAX_BODY_SIZE).
1_048_576- MAX_JSON_NESTING =
JSON nesting depth limit (matches MCPServer::MAX_JSON_NESTING).
20- DEFAULT_HEARTBEAT_INTERVAL =
Default heartbeat interval in seconds when streaming is enabled.
2- DEFAULT_APPROVAL_TIMEOUT =
Seconds to wait for a human's elicitation reply before failing closed (refusing the destructive op). Generous by default — a human-in-the-loop approver needs time the tool timeout doesn't allow. Tune via
approval_timeout:. 300- JSON_CONTENT_TYPE =
Standard Content-Type for all JSON responses. Frozen template — call #json_headers to obtain a per-response mutable copy that composes with Rack middleware that decorates response headers (e.g. Sinatra's xss_header / json_csrf / common_logger).
{ "Content-Type" => "application/json" }.freeze
- SSE_HEADERS =
SSE response headers. X-Accel-Buffering disables Nginx proxy buffering. Frozen template — call #sse_headers to obtain a per-response copy.
{ "Content-Type" => "text/event-stream", "Cache-Control" => "no-cache", "Connection" => "keep-alive", "X-Accel-Buffering" => "no", }.freeze
Instance Attribute Summary collapse
-
#subscription_manager ⇒ Parse::Agent::MCPSubscriptions::Manager?
readonly
The listening-stream coordinator backing this app's server→client bus, or nil when neither resource subscriptions nor notifications are enabled.
Class Method Summary collapse
-
.active_dispatcher_count ⇒ Object
Returns the number of currently live dispatcher threads spawned by any SSEBody across all MCPRackApp instances in this process.
-
.active_listening_stream_count ⇒ Object
Process-wide count of currently-open GET listening streams across all MCPRackApp instances.
- .adjust_listening_stream_count(delta) ⇒ Object
-
.strip_underscore_smuggled_headers!(env) ⇒ Hash
Drop env keys that would have come from underscore-form HTTP header names.
Instance Method Summary collapse
-
#call(env) ⇒ Array(Integer, Hash, #each)
Rack interface.
-
#initialize(agent_factory: nil, max_body_size: DEFAULT_MAX_BODY_SIZE, logger: nil, streaming: false, heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL, max_concurrent_dispatchers: nil, pre_auth_rate_limiter: nil, allowed_origins: nil, require_custom_header: nil, resource_subscriptions: false, subscription_manager: nil, notifications: false, approval_timeout: DEFAULT_APPROVAL_TIMEOUT, principal_resolver: nil, health_path: nil, &block) ⇒ MCPRackApp
constructor
A new instance of MCPRackApp.
-
#notify(session_id, method:, params: nil) ⇒ Boolean
Push a server-initiated JSON-RPC NOTIFICATION to a session's open listening stream.
Constructor Details
#initialize(agent_factory: nil, max_body_size: DEFAULT_MAX_BODY_SIZE, logger: nil, streaming: false, heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL, max_concurrent_dispatchers: nil, pre_auth_rate_limiter: nil, allowed_origins: nil, require_custom_header: nil, resource_subscriptions: false, subscription_manager: nil, notifications: false, approval_timeout: DEFAULT_APPROVAL_TIMEOUT, principal_resolver: nil, health_path: nil, &block) ⇒ MCPRackApp
Returns a new instance of MCPRackApp.
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 238 def initialize(agent_factory: nil, max_body_size: DEFAULT_MAX_BODY_SIZE, logger: nil, streaming: false, heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL, max_concurrent_dispatchers: nil, pre_auth_rate_limiter: nil, allowed_origins: nil, require_custom_header: nil, resource_subscriptions: false, subscription_manager: nil, notifications: false, approval_timeout: DEFAULT_APPROVAL_TIMEOUT, principal_resolver: nil, health_path: nil, &block) if agent_factory && block raise ArgumentError, "Provide agent_factory: OR a block, not both" end unless agent_factory || block raise ArgumentError, "Either agent_factory: keyword or a block is required" end if pre_auth_rate_limiter && !pre_auth_rate_limiter.respond_to?(:check!) raise ArgumentError, "pre_auth_rate_limiter must respond to #check!" end @agent_factory = agent_factory || block @max_body_size = max_body_size @logger = logger @streaming = streaming @heartbeat_interval = heartbeat_interval @max_concurrent_dispatchers = max_concurrent_dispatchers @pre_auth_rate_limiter = pre_auth_rate_limiter @allowed_origins = normalize_allowed_origins(allowed_origins) @required_custom_header = normalize_required_custom_header(require_custom_header) @health_path = health_path.is_a?(String) && !health_path.empty? ? health_path : nil # Per-app registry of in-flight cancellable requests. Keyed by # [correlation_id, request_id]. A `notifications/cancelled` POST # whose `params.requestId` matches an entry trips the registered # CancellationToken. Scoped per-instance, not per-process: this # registry does not span multiple MCPRackApp mount points within # a process, nor multiple processes in a clustered deployment. @cancellation_registry = CancellationRegistry.new # Elicitation (human-in-the-loop approval) state, shared across # this app's requests and its GET listening streams. The # capability registry records (per session) whether the client # advertised `elicitation` at initialize; the pending registry # holds server→client requests awaiting a reply. Both are cheap # and always present; they only do work when # Parse::Agent.require_approval_for opts a tier in. @elicitation_capabilities = Parse::Agent::ClientCapabilityRegistry.new @pending_elicitations = Parse::Agent::PendingElicitationRegistry.new @approval_timeout = approval_timeout # Binds each MCP session id to the principal that established it so a # listening stream can't be hijacked by another authenticated caller. # Same per-instance / single-process scope as @cancellation_registry. @session_owners = SessionOwnerRegistry.new if principal_resolver && !principal_resolver.respond_to?(:call) raise ArgumentError, "principal_resolver must respond to #call" end @principal_resolver = principal_resolver # Listening-stream coordinator (the server→client broadcast bus # backing resource subscriptions, MCP elicitation, and # general-purpose server-initiated notifications). An injected # manager wins. Otherwise: # - `resource_subscriptions: true` builds a LiveQuery-backed # manager whose `supported?` resolves live (advertises # `resources.subscribe` and serves subscribe POSTs). # - `notifications: true` (without resource subscriptions) builds # a manager in `supported: false` posture: the GET listening # stream + `#notify` bus work, but `resources.subscribe` stays # unadvertised and subscribe POSTs fail closed. This is the # decoupling lever — a server can push arbitrary notifications # without enabling LiveQuery resource subscriptions. # nil disables the GET listening stream entirely. @subscription_manager = if subscription_manager subscription_manager elsif resource_subscriptions Parse::Agent::MCPSubscriptions::Manager.new(logger: @logger) elsif notifications Parse::Agent::MCPSubscriptions::Manager.new(logger: @logger, supported: false) end # Warn operators who enable a streaming surface without a concurrency # cap. Both request-scoped SSE (streaming:) and the long-lived GET # listening stream (resource_subscriptions:/notifications:, which set # @subscription_manager) spawn per-connection threads; an unbounded # endpoint is a practical DoS surface — a slow or hostile client opening # connections faster than they close can exhaust the host thread pool and # downstream Parse connection pool. The cap bounds each surface # SEPARATELY, so the effective ceiling is up to 2x max_concurrent_dispatchers # across both. Leaving the default `nil` (unlimited) preserves backward # compatibility, but we tell the operator once at construction. if (streaming || @subscription_manager) && @max_concurrent_dispatchers.nil? surface = streaming ? "streaming: true" : "resource_subscriptions/notifications" line = "[Parse::Agent::MCPRackApp] #{surface} with max_concurrent_dispatchers: nil (unlimited). " \ "Set a finite cap (e.g. 100, or 2x your Puma max_threads) to bound the orphan-thread DoS surface. " \ "See docs/mcp_guide.md for sizing guidance." if @logger @logger.warn(line) else warn line end end end |
Instance Attribute Details
#subscription_manager ⇒ Parse::Agent::MCPSubscriptions::Manager? (readonly)
The listening-stream coordinator backing this app's server→client
bus, or nil when neither resource subscriptions nor notifications
are enabled. Exposed so a clustered/Redis notifier adapter or an
out-of-band publisher can reach the bus directly. Direct
#publish accepts arbitrary messages (notifications OR id-bearing
requests); prefer #notify for the validated notification path.
352 353 354 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 352 def subscription_manager @subscription_manager end |
Class Method Details
.active_dispatcher_count ⇒ Object
Returns the number of currently live dispatcher threads spawned by any
SSEBody across all MCPRackApp instances in this process. Threads are
counted by the :parse_mcp_dispatcher thread-local tag set when each
dispatcher_thread is started. Use this for operator dashboards or health
checks; do NOT use it to make flow-control decisions at runtime (use
the max_concurrent_dispatchers: constructor option for that).
389 390 391 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 389 def self.active_dispatcher_count Thread.list.count { |t| t[:parse_mcp_dispatcher] } end |
.active_listening_stream_count ⇒ Object
Process-wide count of currently-open GET listening streams across all MCPRackApp instances. A listening stream is long-lived (the server→client notification channel) — each pins a server worker thread in #each plus a heartbeat thread — so it is bounded SEPARATELY from request-scoped SSE dispatchers (which #each, dispatch once, then close). Used as the soft cap in #serve_listening_stream. Maintained by ListeningStreamBody via adjust_listening_stream_count; unlike a Thread.list scan this is an explicit counter because the heartbeat thread is intentionally not tagged as a dispatcher.
402 403 404 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 402 def self.active_listening_stream_count @listening_stream_mutex.synchronize { @listening_stream_count } end |
.adjust_listening_stream_count(delta) ⇒ Object
408 409 410 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 408 def self.adjust_listening_stream_count(delta) @listening_stream_mutex.synchronize { @listening_stream_count += delta } end |
.strip_underscore_smuggled_headers!(env) ⇒ Hash
Drop env keys that would have come from underscore-form HTTP header
names. The Rack-spec-compliant interpretation of HTTP headers maps
X-MCP-API-Key and X_MCP_API_KEY to the same env key
(HTTP_X_MCP_API_KEY); a misbehaving upstream server that forwards
the underscore-form lets an attacker overwrite trusted reverse-proxy-
injected headers.
This helper is invoked automatically at the top of #call, so any
MCPRackApp mounted in a Rack 3+ pipeline (which exposes the original
header list via rack.headers) gets defense-in-depth scrubbing
without operator opt-in. On Rack 2 / pre-3 servers rack.headers is
not set and the helper is a no-op; operators on those stacks must
configure their upstream (e.g. Nginx underscores_in_headers off)
OR mount this helper as an explicit middleware.
The standalone MCPServer rewrites its own build_rack_env to drop
underscore-form names before they reach this app, so the standalone
path is covered regardless of Rack version.
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 143 def self.strip_underscore_smuggled_headers!(env) # Rack 3+ preserves the original header list in env["rack.headers"] # (a Rack::Headers instance or Hash). When present, we can identify # which env keys came from an underscore-form header and delete # them, even if a dashed-form sibling arrived too. if env["rack.headers"].respond_to?(:each) suspect = [] env["rack.headers"].each do |name, _| suspect << name if name.is_a?(String) && name.include?("_") end suspect.each do |name| env.delete("HTTP_#{name.upcase.tr("-", "_")}") end end env end |
Instance Method Details
#call(env) ⇒ Array(Integer, Hash, #each)
Rack interface.
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 416 def call(env) # 0. Defense-in-depth: strip underscore-form HTTP headers from env # before any subsequent lookup reads HTTP_X_MCP_API_KEY / etc. # No-op on Rack < 3 (where env["rack.headers"] is absent); on # Rack 3+ this removes any HTTP_* env key whose original header # name contained an underscore. Closes the smuggling path where # a hostile client sends `X_MCP_API_Key: ...` alongside a # trusted reverse-proxy-injected `X-MCP-API-Key: ...` and the # underscored form collapses-and-overwrites the trusted slot. self.class.strip_underscore_smuggled_headers!(env) # 0a. Liveness probe. When `health_path:` is configured, a GET to # that exact path returns `{"status":"ok"}` without auth, # rate-limiting, or factory invocation. Intentionally # fingerprint-minimal: no version, no build, no counter — # a load balancer needs "is it up?", not "what is it?". if @health_path && env["PATH_INFO"] == @health_path && env["REQUEST_METHOD"] == "GET" return [200, json_headers, ['{"status":"ok"}']] end # 0b. NEW-MCP-6: pre-auth rate limit. Runs BEFORE the agent_factory # so a malformed body / missing key / empty `{}` cannot force # the operator-supplied factory to round-trip to Parse Server # on every request. Off by default (constructor kwarg). if @pre_auth_rate_limiter begin @pre_auth_rate_limiter.check! rescue StandardError => e retry_after = e.respond_to?(:retry_after) ? e.retry_after : nil headers = json_headers.dup headers["Retry-After"] = retry_after.ceil.to_s if retry_after && retry_after > 0 return [429, headers, [json_rpc_error(-32_000, "Too Many Requests")]] end end # 0c. DELETE / — MCP 2025-06-18 Streamable HTTP session # termination. A client signals it is done with a session by # sending DELETE with the same `Mcp-Session-Id` header it # received from initialize. Per spec the server MAY support # this; if it doesn't, it MUST return 405. We support it. # # Stateless-agent reality: the factory builds a fresh agent # per request, so there is no server-side session store to # evict. What DELETE meaningfully does is cancel any # in-flight requests still running under that correlation_id # so worker threads exit instead of completing wasted work. # The cancellation_registry returns 0 when nothing matches # (also the "unknown session" case) — we don't probe-leak by # differentiating known vs unknown ids in the response. # # Sanitized through Parse::Agent#correlation_id= via a # throwaway agent so a malicious header value (CRLF, shell # metachars) is silently rejected rather than reaching the # registry as a key. if env["REQUEST_METHOD"] == "DELETE" sid = env["HTTP_MCP_SESSION_ID"].to_s if sid.empty? return [400, json_headers, [json_rpc_error(-32_600, "Missing Mcp-Session-Id")]] end clean_sid = sanitize_session_id(sid) if clean_sid.nil? return [400, json_headers, [json_rpc_error(-32_600, "Invalid Mcp-Session-Id")]] end @cancellation_registry.cancel_all_for(clean_sid, reason: :session_terminated) # Wake any tool thread blocked on an elicitation reply for this # session (it returns `unavailable` → fail closed) and drop the # session's cached elicitation capability. @pending_elicitations.abort_all_for(clean_sid, :session_terminated) @elicitation_capabilities.forget(clean_sid) # Tear down any resource subscriptions and the listening stream # bound to this session so a terminated session leaves no LiveQuery # sockets behind. @subscription_manager&.detach_listener(clean_sid) # Drop the owner binding so the id can be reclaimed after explicit # termination (only here — not on mere stream close, so a reconnect # keeps its claim). @session_owners.forget(clean_sid) return [204, json_headers, [""]] end # 0d. GET listening stream — the MCP 2025-06-18 Streamable HTTP # server→client channel that carries unsolicited # `notifications/resources/updated`. Only when resource # subscriptions are enabled, the client opted into SSE, and a # valid Mcp-Session-Id is present. Authenticated via the same # agent_factory as POST: the session id is a server-issued # bearer capability (returned on initialize), so possession of # it plus a valid agent gates the stream. if env["REQUEST_METHOD"] == "GET" && @subscription_manager && env["HTTP_ACCEPT"].to_s.include?("text/event-stream") return serve_listening_stream(env) end # 1. Method check — only POST is accepted. unless env["REQUEST_METHOD"] == "POST" return [405, json_headers.merge("Allow" => "POST"), [json_rpc_error(-32_700, "method_not_allowed")]] end # 2. Content-type check — must be application/json (charset ignored). content_type = env["CONTENT_TYPE"].to_s.split(";").first.to_s.strip.downcase unless content_type == "application/json" return [415, json_headers, [json_rpc_error(-32_700, "Unsupported Media Type: Content-Type must be application/json")]] end # 2b. Origin allowlist. Browsers always send an `Origin` header # on cross-origin POST; native clients typically don't. # When configured, a non-empty `Origin` must match the # allowlist or the request is rejected with 403. # Missing/empty `Origin` is allowed regardless — native # clients (curl, SDK-to-SDK) shouldn't be broken by a # CSRF defense aimed at browsers. if @allowed_origins origin = env["HTTP_ORIGIN"].to_s.strip unless origin.empty? || origin_allowed?(origin) @logger&.warn("[Parse::Agent::MCPRackApp] Origin refused: #{origin.inspect}") return [403, json_headers, [json_rpc_error(-32_700, "Origin not allowed")]] end end # 2c. Required custom header (CSRF defense-in-depth). A header # like `X-MCP-Client` cannot be set by a `<form>` CSRF and # forces a CORS preflight on browser `fetch()`. When # configured, the header must be present and (if a value # was supplied to the constructor) match. if @required_custom_header header_env_key, expected_value = @required_custom_header actual = env[header_env_key].to_s if actual.empty? || (expected_value && actual != expected_value) return [403, json_headers, [json_rpc_error(-32_700, "Required custom header missing or invalid")]] end end # 3. Body size limit — read one byte beyond limit to detect oversized bodies # without buffering the full stream. raw_body = env["rack.input"].read(@max_body_size + 1) if raw_body.bytesize > @max_body_size return [413, json_headers, [json_rpc_error(-32_700, "Payload Too Large: body exceeds #{@max_body_size} bytes")]] end # 4. JSON parse. begin body = JSON.parse(raw_body.empty? ? "{}" : raw_body, max_nesting: MAX_JSON_NESTING) rescue JSON::ParserError, JSON::NestingError return [400, json_headers, [json_rpc_error(-32_700, "Parse error: invalid JSON")]] end # 4b. NEW-MCP-6: refuse obviously-malformed JSON-RPC envelopes # BEFORE invoking the agent_factory. The factory typically # hits Parse Server (token validation, audit logging), so a # barrage of empty `{}` or missing-method bodies otherwise # amplifies into a Parse Server load problem. Empty-object # and missing-method requests cannot possibly be valid # JSON-RPC, so we shortcut to -32600 (Invalid Request). # A method-less JSON-RPC RESPONSE ({jsonrpc,id,result|error}) is # NOT malformed: it is the client's reply to a server-issued # elicitation/create request. Let it through here; it is routed # (session-bound) after the agent_factory resolves the session. unless (body.is_a?(Hash) && body["method"].is_a?(String) && !body["method"].empty?) || elicitation_reply?(body) return [400, json_headers, [json_rpc_error(-32_600, "Invalid Request")]] end # 4c. MCP-Protocol-Version header validation (MCP 2025-06-18 # Streamable HTTP). The spec says: # - The client MUST send `MCP-Protocol-Version: <ver>` # on every request AFTER initialize. # - If absent on a non-initialize request, the server # SHOULD assume `2025-03-26` for backwards compatibility. # - If present but not a version the server supports, # the server MUST respond `400 Bad Request`. # Initialize requests are exempt — initialize IS the # negotiation, so the header is meaningless there. # Cancellation notifications are also exempt because they # may be sent by a client that has not (yet) completed # initialize against this transport instance (e.g. a # reconnecting client cancelling a pre-disconnect request). unless body["method"] == "initialize" || body["method"] == "notifications/cancelled" || elicitation_reply?(body) requested = env["HTTP_MCP_PROTOCOL_VERSION"] if requested.is_a?(String) && !requested.empty? && !Parse::Agent::MCPDispatcher::SUPPORTED_PROTOCOL_VERSIONS.include?(requested) return [400, json_headers, [json_rpc_error(-32_600, "Unsupported MCP-Protocol-Version: #{requested}", id: body["id"])]] end end # 5. Agent factory — auth gate. Rescue Unauthorized first, then catch-all # for unexpected factory errors. begin agent = @agent_factory.call(env) rescue Parse::Agent::Unauthorized => e @logger.warn("[Parse::Agent::MCPRackApp] Unauthorized: #{e.class.name}") if @logger return [401, json_headers, []] rescue StandardError => e if @logger @logger.warn("[Parse::Agent::MCPRackApp] Factory error: #{e.class.name}") @logger.warn(e.backtrace.join("\n")) if e.backtrace end return [500, json_headers, [json_rpc_error(-32_603, "Internal error")]] end # 5a-i. Surface the silent-ungated-writes footgun. A write/admin agent # served over MCP with no approval tier configured runs every # destructive tool without a human gate; warn once per process so # the operator notices (mirrors the unrestricted-endpoints warning). if agent.respond_to?(:permissions) && %i[write admin].include?(agent.) && Parse::Agent.require_approval_for.empty? Parse::Agent.warn_mcp_writes_unguarded! end # 5b. Thread the conversation correlation id through. Source # header: the MCP 2025-06-18 Streamable HTTP spec-canonical # `Mcp-Session-Id` (Rack env key `HTTP_MCP_SESSION_ID`). # # Only fills it when the factory hasn't already assigned one # — application code that needs to override the # client-supplied id (e.g., bind to an internal session # record) can do so in the factory and we don't stomp on it. # The Parse::Agent#correlation_id= setter sanitizes the # value; an invalid header is silently dropped. if agent && agent.respond_to?(:correlation_id=) && agent.correlation_id.nil? && (sid = env["HTTP_MCP_SESSION_ID"]) agent.correlation_id = sid end # 5b-i. Server-assigned Mcp-Session-Id on initialize. Per MCP # 2025-06-18 Streamable HTTP, the server SHOULD assign a # fresh session id during initialize when the client did not # supply one, and return it on the response so the client # can echo it on subsequent requests. Stateless-agent # reality: the SDK does not maintain a server-side session # store — the id is best-effort correlation only (used for # audit-log threading and cancellation routing). We do not # refuse subsequent requests carrying an "unknown" id. if body.is_a?(Hash) && body["method"] == "initialize" && agent && agent.respond_to?(:correlation_id=) && agent.correlation_id.nil? agent.correlation_id = SecureRandom.uuid end # 5b-ii. Capture the client's elicitation capability at initialize. # The server reads (does not advertise) the client's # `capabilities.elicitation`; the approval gate consults this # per session before attempting a server→client prompt. if body.is_a?(Hash) && body["method"] == "initialize" && agent.respond_to?(:correlation_id) && agent.correlation_id supported = !!(body.dig("params", "capabilities", "elicitation")) @elicitation_capabilities.set(agent.correlation_id, supported) # Authoritatively bind this session to the initializing principal so # only the same principal can later attach a listening stream for it # (owner-binding; see SessionOwnerRegistry). @session_owners.bind(agent.correlation_id, principal_fingerprint(agent, env)) end # 5b-iii. Elicitation reply ingress. A method-less JSON-RPC # RESPONSE is the client's answer to a server-issued # elicitation/create. Route it into the pending registry, # session-bound by the same `correlation_id` the cancellation # path uses, so one session can never answer another's prompt. # Failures (no correlation_id, no match) are silent 202 no-ops # to avoid a probe oracle — exactly like notifications/cancelled. if elicitation_reply?(body) route_elicitation_reply(agent, body) return [202, json_headers, [""]] end # 5c. notifications/cancelled — special-cased BEFORE the dispatcher. # A JSON-RPC notification has no `id`, expects no response # body, and must trip the in-flight request whose # `(correlation_id, request_id)` matches. We require the # cancelling request to carry the same Mcp-Session-Id # (sanitized into agent.correlation_id above) as the original # request — otherwise an attacker who guesses sequential # JSON-RPC ids could cancel arbitrary in-flight requests. # # Failures (no correlation_id, no requestId, no match) are # silent no-ops to avoid a probe oracle. The response is # always 202 Accepted with an empty body. if body.is_a?(Hash) && body["method"] == "notifications/cancelled" request_id = body.dig("params", "requestId") if agent.respond_to?(:correlation_id) && agent.correlation_id && request_id @cancellation_registry.cancel( agent.correlation_id, request_id, reason: :notifications_cancelled, ) end return [202, json_headers, [""]] end # 6. Branch on streaming preference. Transport-level errors (steps 1-5) # always return plain JSON regardless of the Accept header. if @streaming && env["HTTP_ACCEPT"].to_s.include?("text/event-stream") serve_sse(body, agent) else serve_json(body, agent) end end |
#notify(session_id, method:, params: nil) ⇒ Boolean
Push a server-initiated JSON-RPC NOTIFICATION to a session's open
listening stream. This is the public front door for application
code to deliver unsolicited notifications/* events (the GET
stream must be open for the session — open it client-side with a
GET carrying Accept: text/event-stream + Mcp-Session-Id).
The envelope is built server-side as a notification — it never
carries an id, which is what distinguishes it from the
server-initiated request path (e.g. elicitation/create). A
caller wanting an id-bearing request uses the internal
subscription_manager.publish seam, not this method.
373 374 375 376 377 378 379 380 381 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 373 def notify(session_id, method:, params: nil) unless method.is_a?(String) && !method.empty? raise ArgumentError, "notify: method must be a non-empty String" end return false unless @subscription_manager envelope = { "jsonrpc" => "2.0", "method" => method } envelope["params"] = params unless params.nil? !!@subscription_manager.publish(session_id, envelope) end |