Class: Parse::Agent::MCPRackApp

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/agent/mcp_rack_app.rb

Overview

Rack adapter that exposes Parse::Agent::MCPDispatcher as a mountable Rack endpoint. Downstream applications can mount this inside Sinatra, Rails, or any Rack-compatible router at an arbitrary path and behind their own authentication gate.

The adapter enforces the same transport-level invariants as MCPServer (method, content-type, body-size, and JSON-parse checks) and then delegates to Parse::Agent::MCPDispatcher.call for all protocol handling.

== SSE Streaming (MCP progress notifications)

When constructed with streaming: true, requests that include Accept: text/event-stream receive an SSE response instead of a single JSON body. The server holds the connection open and emits notifications/progress events from two sources:

  1. Time-based heartbeats every heartbeat_interval seconds while the dispatcher runs (progress field = elapsed seconds).
  2. Tool-internal progress reported by the tool itself via agent.report_progress(progress:, total:, message:). Works for both built-in tools and custom tools registered through Parse::Agent::Tools.register.

Heartbeats are automatically suppressed once a tool reports its own progress, so the progressToken carries a single coherent stream. A final response event carries the complete JSON-RPC response, after which the stream closes.

This lets LLM clients observe progress on long-running tool calls (such as aggregate pipelines) rather than timing out silently.

Streaming requires a Rack server that supports streaming response bodies (Puma, Falcon, Unicorn). WEBrick buffers the full body before writing, so SSE streaming has no effect on the standalone MCPServer — operators using MCPServer directly should leave streaming: false (the default).

To disable Nginx response buffering for SSE endpoints, set: proxy_buffering off; or rely on the X-Accel-Buffering: no header this class emits automatically on every SSE response.

When streaming: false (default), an Accept: text/event-stream request receives a plain JSON response — the adapter is permissive per the MCP spec, which does not require SSE support.

Examples:

Block form (most common)

app = Parse::Agent::MCPRackApp.new do |env|
  token = env["HTTP_AUTHORIZATION"].to_s.delete_prefix("Bearer ")
  agent = MyAuth.agent_for_token!(token)  # raises Unauthorized if invalid
  agent
end

Keyword argument form

factory = ->(env) { Parse::Agent.new(permissions: :readonly) }
app = Parse::Agent::MCPRackApp.new(agent_factory: factory)

With SSE streaming enabled

app = Parse::Agent::MCPRackApp.new(streaming: true) { |env| ... }

Mounted in Rails routes.rb

mount Parse::Agent::MCPRackApp.new { |env| ... }, at: "/mcp"

Defined Under Namespace

Classes: CancellationRegistry, ListeningStreamBody, SSEBody, SessionOwnerRegistry

Constant Summary collapse

DEFAULT_MAX_BODY_SIZE =

Maximum allowed request body size in bytes (matches MCPServer::MAX_BODY_SIZE).

1_048_576
MAX_JSON_NESTING =

JSON nesting depth limit (matches MCPServer::MAX_JSON_NESTING).

20
DEFAULT_HEARTBEAT_INTERVAL =

Default heartbeat interval in seconds when streaming is enabled.

2
DEFAULT_APPROVAL_TIMEOUT =

Seconds to wait for a human's elicitation reply before failing closed (refusing the destructive op). Generous by default — a human-in-the-loop approver needs time the tool timeout doesn't allow. Tune via approval_timeout:.

300
JSON_CONTENT_TYPE =

Standard Content-Type for all JSON responses. Frozen template — call #json_headers to obtain a per-response mutable copy that composes with Rack middleware that decorates response headers (e.g. Sinatra's xss_header / json_csrf / common_logger).

{ "Content-Type" => "application/json" }.freeze
SSE_HEADERS =

SSE response headers. X-Accel-Buffering disables Nginx proxy buffering. Frozen template — call #sse_headers to obtain a per-response copy.

{
  "Content-Type"      => "text/event-stream",
  "Cache-Control"     => "no-cache",
  "Connection"        => "keep-alive",
  "X-Accel-Buffering" => "no",
}.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(agent_factory: nil, max_body_size: DEFAULT_MAX_BODY_SIZE, logger: nil, streaming: false, heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL, max_concurrent_dispatchers: nil, pre_auth_rate_limiter: nil, allowed_origins: nil, require_custom_header: nil, resource_subscriptions: false, subscription_manager: nil, notifications: false, approval_timeout: DEFAULT_APPROVAL_TIMEOUT, principal_resolver: nil, health_path: nil, &block) ⇒ MCPRackApp

Returns a new instance of MCPRackApp.

Parameters:

  • agent_factory (Proc, nil) (defaults to: nil)

    callable invoked with the Rack env on every request. Must return a Parse::Agent or raise Parse::Agent::Unauthorized. Mutually exclusive with a block.

  • max_body_size (Integer) (defaults to: DEFAULT_MAX_BODY_SIZE)

    reject bodies larger than this many bytes. Defaults to DEFAULT_MAX_BODY_SIZE.

  • logger (#warn, nil) (defaults to: nil)

    optional logger. When set, auth failures are warned at class-name level, and internal errors include a backtrace.

  • streaming (Boolean) (defaults to: false)

    enable SSE streaming for clients that send Accept: text/event-stream. Defaults to false for backward compatibility. Has no effect on WEBrick-backed deployments (see class documentation).

  • heartbeat_interval (Numeric) (defaults to: DEFAULT_HEARTBEAT_INTERVAL)

    seconds between progress heartbeat events when streaming is active. Defaults to DEFAULT_HEARTBEAT_INTERVAL. Ignored when streaming: false.

  • max_concurrent_dispatchers (Integer, nil) (defaults to: nil)

    when set, limits the number of concurrently active dispatcher threads across all SSE connections served by this app instance. When the limit is reached a new SSE request immediately receives a 503 JSON-RPC error envelope (-32000 "server busy") rather than spawning another dispatcher. Defaults to nil (unlimited). Use active_dispatcher_count to monitor current concurrency from operator tooling.

  • pre_auth_rate_limiter (#check!, nil) (defaults to: nil)

    optional rate limiter consulted at the top of every request, BEFORE the agent_factory is invoked. Closes the factory-amplification DoS where each malformed request burns a Parse Server round-trip (factories typically validate session tokens by calling out). Must respond to #check! and raise an exception responding to #retry_after (such as Parse::Agent::RateLimiter::RateLimitExceeded) when exhausted. Defaults to nil (no pre-auth limiter). On exhaustion the request is rejected with HTTP 429 and a Retry-After header.

  • allowed_origins (Array<String>, nil) (defaults to: nil)

    when set, the Origin request header must match one of these entries (case-insensitive, exact host match — wildcard via leading . matches subdomains). nil (default) skips the check. Browsers always send Origin on cross-origin POST; native clients (curl, ruby HTTP client, SDK-to-SDK) typically don't, and an absent Origin is treated as allowed regardless of this setting. The default loopback bind makes this check optional in development; operators who bind MCP to a routable interface should configure it.

  • require_custom_header (String, nil) (defaults to: nil)

    when set (e.g. "X-MCP-Client"), requests must carry that header with any non-empty value. Custom headers can't be set by a <form> CSRF and force a CORS preflight on browser fetch(), so this gate closes the browser-driven attack surface entirely. Pair with allowed_origins for defense in depth.

  • health_path (String, nil) (defaults to: nil)

    when set (e.g. "/health"), GET requests to that exact path return 200 {"status":"ok"} without invoking the agent_factory, without authentication, without rate-limiting, and without applying the allowed_origins / require_custom_header CSRF gates. Intended as a liveness probe for load balancers and orchestrators (Kubernetes, ECS, Consul) that cannot present a matching Origin or custom header. Because the probe sits ahead of the pre-auth rate limiter, operators should front-edge rate-limit the path at the LB/Nginx layer if public-facing. The response intentionally contains no version, build, or counter information — fingerprint-minimal by design. nil (default) disables the endpoint entirely; empty-string values are coerced to nil. Any non-GET method on the path falls through to the standard 405 handler.

  • resource_subscriptions (Boolean) (defaults to: false)

    enable MCP resource subscriptions (resources/subscribe + notifications/resources/updated) bridged onto Parse LiveQuery. Defaults to false. When true, this app accepts a GET with Accept: text/event-stream and an Mcp-Session-Id header as a long-lived server→client listening stream, and advertises the resources.subscribe capability on initialize — but ONLY while LiveQuery is enabled (Parse.live_query_enabled = true) and available (a live_query_url is configured). Requires a streaming-capable Rack server (Puma, Falcon); WEBrick buffers responses and cannot hold the listening stream open. See docs/mcp_guide.md for the credential-scoping and single-process caveats.

  • subscription_manager (Parse::Agent::MCPSubscriptions::Manager, nil) (defaults to: nil)

    inject a pre-built manager (tests, or to share a clustered-notifier adapter). Takes precedence over resource_subscriptions:. When nil and resource_subscriptions: true, a default in-process manager is constructed.

Raises:

  • (ArgumentError)

    if both or neither of agent_factory/block are given.



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'lib/parse/agent/mcp_rack_app.rb', line 238

def initialize(agent_factory: nil, max_body_size: DEFAULT_MAX_BODY_SIZE,
               logger: nil, streaming: false,
               heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
               max_concurrent_dispatchers: nil,
               pre_auth_rate_limiter: nil,
               allowed_origins: nil,
               require_custom_header: nil,
               resource_subscriptions: false,
               subscription_manager: nil,
               notifications: false,
               approval_timeout: DEFAULT_APPROVAL_TIMEOUT,
               principal_resolver: nil,
               health_path: nil, &block)
  if agent_factory && block
    raise ArgumentError, "Provide agent_factory: OR a block, not both"
  end
  unless agent_factory || block
    raise ArgumentError, "Either agent_factory: keyword or a block is required"
  end
  if pre_auth_rate_limiter && !pre_auth_rate_limiter.respond_to?(:check!)
    raise ArgumentError, "pre_auth_rate_limiter must respond to #check!"
  end

  @agent_factory              = agent_factory || block
  @max_body_size              = max_body_size
  @logger                     = logger
  @streaming                  = streaming
  @heartbeat_interval         = heartbeat_interval
  @max_concurrent_dispatchers = max_concurrent_dispatchers
  @pre_auth_rate_limiter      = pre_auth_rate_limiter
  @allowed_origins            = normalize_allowed_origins(allowed_origins)
  @required_custom_header     = normalize_required_custom_header(require_custom_header)
  @health_path                = health_path.is_a?(String) && !health_path.empty? ? health_path : nil
  # Per-app registry of in-flight cancellable requests. Keyed by
  # [correlation_id, request_id]. A `notifications/cancelled` POST
  # whose `params.requestId` matches an entry trips the registered
  # CancellationToken. Scoped per-instance, not per-process: this
  # registry does not span multiple MCPRackApp mount points within
  # a process, nor multiple processes in a clustered deployment.
  @cancellation_registry      = CancellationRegistry.new

  # Elicitation (human-in-the-loop approval) state, shared across
  # this app's requests and its GET listening streams. The
  # capability registry records (per session) whether the client
  # advertised `elicitation` at initialize; the pending registry
  # holds server→client requests awaiting a reply. Both are cheap
  # and always present; they only do work when
  # Parse::Agent.require_approval_for opts a tier in.
  @elicitation_capabilities   = Parse::Agent::ClientCapabilityRegistry.new
  @pending_elicitations       = Parse::Agent::PendingElicitationRegistry.new
  @approval_timeout           = approval_timeout

  # Binds each MCP session id to the principal that established it so a
  # listening stream can't be hijacked by another authenticated caller.
  # Same per-instance / single-process scope as @cancellation_registry.
  @session_owners             = SessionOwnerRegistry.new
  if principal_resolver && !principal_resolver.respond_to?(:call)
    raise ArgumentError, "principal_resolver must respond to #call"
  end
  @principal_resolver         = principal_resolver

  # Listening-stream coordinator (the server→client broadcast bus
  # backing resource subscriptions, MCP elicitation, and
  # general-purpose server-initiated notifications). An injected
  # manager wins. Otherwise:
  #   - `resource_subscriptions: true` builds a LiveQuery-backed
  #     manager whose `supported?` resolves live (advertises
  #     `resources.subscribe` and serves subscribe POSTs).
  #   - `notifications: true` (without resource subscriptions) builds
  #     a manager in `supported: false` posture: the GET listening
  #     stream + `#notify` bus work, but `resources.subscribe` stays
  #     unadvertised and subscribe POSTs fail closed. This is the
  #     decoupling lever — a server can push arbitrary notifications
  #     without enabling LiveQuery resource subscriptions.
  # nil disables the GET listening stream entirely.
  @subscription_manager =
    if subscription_manager
      subscription_manager
    elsif resource_subscriptions
      Parse::Agent::MCPSubscriptions::Manager.new(logger: @logger)
    elsif notifications
      Parse::Agent::MCPSubscriptions::Manager.new(logger: @logger, supported: false)
    end

  # Warn operators who enable a streaming surface without a concurrency
  # cap. Both request-scoped SSE (streaming:) and the long-lived GET
  # listening stream (resource_subscriptions:/notifications:, which set
  # @subscription_manager) spawn per-connection threads; an unbounded
  # endpoint is a practical DoS surface — a slow or hostile client opening
  # connections faster than they close can exhaust the host thread pool and
  # downstream Parse connection pool. The cap bounds each surface
  # SEPARATELY, so the effective ceiling is up to 2x max_concurrent_dispatchers
  # across both. Leaving the default `nil` (unlimited) preserves backward
  # compatibility, but we tell the operator once at construction.
  if (streaming || @subscription_manager) && @max_concurrent_dispatchers.nil?
    surface = streaming ? "streaming: true" : "resource_subscriptions/notifications"
    line = "[Parse::Agent::MCPRackApp] #{surface} with max_concurrent_dispatchers: nil (unlimited). " \
           "Set a finite cap (e.g. 100, or 2x your Puma max_threads) to bound the orphan-thread DoS surface. " \
           "See docs/mcp_guide.md for sizing guidance."
    if @logger
      @logger.warn(line)
    else
      warn line
    end
  end
end

Instance Attribute Details

#subscription_managerParse::Agent::MCPSubscriptions::Manager? (readonly)

The listening-stream coordinator backing this app's server→client bus, or nil when neither resource subscriptions nor notifications are enabled. Exposed so a clustered/Redis notifier adapter or an out-of-band publisher can reach the bus directly. Direct #publish accepts arbitrary messages (notifications OR id-bearing requests); prefer #notify for the validated notification path.



352
353
354
# File 'lib/parse/agent/mcp_rack_app.rb', line 352

def subscription_manager
  @subscription_manager
end

Class Method Details

.active_dispatcher_countObject

Returns the number of currently live dispatcher threads spawned by any SSEBody across all MCPRackApp instances in this process. Threads are counted by the :parse_mcp_dispatcher thread-local tag set when each dispatcher_thread is started. Use this for operator dashboards or health checks; do NOT use it to make flow-control decisions at runtime (use the max_concurrent_dispatchers: constructor option for that).



389
390
391
# File 'lib/parse/agent/mcp_rack_app.rb', line 389

def self.active_dispatcher_count
  Thread.list.count { |t| t[:parse_mcp_dispatcher] }
end

.active_listening_stream_countObject

Process-wide count of currently-open GET listening streams across all MCPRackApp instances. A listening stream is long-lived (the server→client notification channel) — each pins a server worker thread in #each plus a heartbeat thread — so it is bounded SEPARATELY from request-scoped SSE dispatchers (which #each, dispatch once, then close). Used as the soft cap in #serve_listening_stream. Maintained by ListeningStreamBody via adjust_listening_stream_count; unlike a Thread.list scan this is an explicit counter because the heartbeat thread is intentionally not tagged as a dispatcher.



402
403
404
# File 'lib/parse/agent/mcp_rack_app.rb', line 402

def self.active_listening_stream_count
  @listening_stream_mutex.synchronize { @listening_stream_count }
end

.adjust_listening_stream_count(delta) ⇒ Object



408
409
410
# File 'lib/parse/agent/mcp_rack_app.rb', line 408

def self.adjust_listening_stream_count(delta)
  @listening_stream_mutex.synchronize { @listening_stream_count += delta }
end

.strip_underscore_smuggled_headers!(env) ⇒ Hash

Drop env keys that would have come from underscore-form HTTP header names. The Rack-spec-compliant interpretation of HTTP headers maps X-MCP-API-Key and X_MCP_API_KEY to the same env key (HTTP_X_MCP_API_KEY); a misbehaving upstream server that forwards the underscore-form lets an attacker overwrite trusted reverse-proxy- injected headers.

This helper is invoked automatically at the top of #call, so any MCPRackApp mounted in a Rack 3+ pipeline (which exposes the original header list via rack.headers) gets defense-in-depth scrubbing without operator opt-in. On Rack 2 / pre-3 servers rack.headers is not set and the helper is a no-op; operators on those stacks must configure their upstream (e.g. Nginx underscores_in_headers off) OR mount this helper as an explicit middleware.

The standalone MCPServer rewrites its own build_rack_env to drop underscore-form names before they reach this app, so the standalone path is covered regardless of Rack version.

Examples:

Explicit middleware (Rack 2 / pre-3 deployments)

class StripSmuggledHeaders
  def initialize(app); @app = app; end
  def call(env)
    Parse::Agent::MCPRackApp.strip_underscore_smuggled_headers!(env)
    @app.call(env)
  end
end

Parameters:

  • env (Hash)

    the Rack env, mutated in place

Returns:

  • (Hash)

    the same env, for chaining



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/parse/agent/mcp_rack_app.rb', line 143

def self.strip_underscore_smuggled_headers!(env)
  # Rack 3+ preserves the original header list in env["rack.headers"]
  # (a Rack::Headers instance or Hash). When present, we can identify
  # which env keys came from an underscore-form header and delete
  # them, even if a dashed-form sibling arrived too.
  if env["rack.headers"].respond_to?(:each)
    suspect = []
    env["rack.headers"].each do |name, _|
      suspect << name if name.is_a?(String) && name.include?("_")
    end
    suspect.each do |name|
      env.delete("HTTP_#{name.upcase.tr("-", "_")}")
    end
  end
  env
end

Instance Method Details

#call(env) ⇒ Array(Integer, Hash, #each)

Rack interface.

Parameters:

  • env (Hash)

    Rack environment

Returns:

  • (Array(Integer, Hash, #each))

    Rack triple



416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
# File 'lib/parse/agent/mcp_rack_app.rb', line 416

def call(env)
  # 0. Defense-in-depth: strip underscore-form HTTP headers from env
  #    before any subsequent lookup reads HTTP_X_MCP_API_KEY / etc.
  #    No-op on Rack < 3 (where env["rack.headers"] is absent); on
  #    Rack 3+ this removes any HTTP_* env key whose original header
  #    name contained an underscore. Closes the smuggling path where
  #    a hostile client sends `X_MCP_API_Key: ...` alongside a
  #    trusted reverse-proxy-injected `X-MCP-API-Key: ...` and the
  #    underscored form collapses-and-overwrites the trusted slot.
  self.class.strip_underscore_smuggled_headers!(env)

  # 0a. Liveness probe. When `health_path:` is configured, a GET to
  #     that exact path returns `{"status":"ok"}` without auth,
  #     rate-limiting, or factory invocation. Intentionally
  #     fingerprint-minimal: no version, no build, no counter —
  #     a load balancer needs "is it up?", not "what is it?".
  if @health_path && env["PATH_INFO"] == @health_path && env["REQUEST_METHOD"] == "GET"
    return [200, json_headers, ['{"status":"ok"}']]
  end

  # 0b. NEW-MCP-6: pre-auth rate limit. Runs BEFORE the agent_factory
  #     so a malformed body / missing key / empty `{}` cannot force
  #     the operator-supplied factory to round-trip to Parse Server
  #     on every request. Off by default (constructor kwarg).
  if @pre_auth_rate_limiter
    begin
      @pre_auth_rate_limiter.check!
    rescue StandardError => e
      retry_after = e.respond_to?(:retry_after) ? e.retry_after : nil
      headers = json_headers.dup
      headers["Retry-After"] = retry_after.ceil.to_s if retry_after && retry_after > 0
      return [429, headers, [json_rpc_error(-32_000, "Too Many Requests")]]
    end
  end

  # 0c. DELETE /  — MCP 2025-06-18 Streamable HTTP session
  #     termination. A client signals it is done with a session by
  #     sending DELETE with the same `Mcp-Session-Id` header it
  #     received from initialize. Per spec the server MAY support
  #     this; if it doesn't, it MUST return 405. We support it.
  #
  #     Stateless-agent reality: the factory builds a fresh agent
  #     per request, so there is no server-side session store to
  #     evict. What DELETE meaningfully does is cancel any
  #     in-flight requests still running under that correlation_id
  #     so worker threads exit instead of completing wasted work.
  #     The cancellation_registry returns 0 when nothing matches
  #     (also the "unknown session" case) — we don't probe-leak by
  #     differentiating known vs unknown ids in the response.
  #
  #     Sanitized through Parse::Agent#correlation_id= via a
  #     throwaway agent so a malicious header value (CRLF, shell
  #     metachars) is silently rejected rather than reaching the
  #     registry as a key.
  if env["REQUEST_METHOD"] == "DELETE"
    sid = env["HTTP_MCP_SESSION_ID"].to_s
    if sid.empty?
      return [400, json_headers, [json_rpc_error(-32_600, "Missing Mcp-Session-Id")]]
    end
    clean_sid = sanitize_session_id(sid)
    if clean_sid.nil?
      return [400, json_headers, [json_rpc_error(-32_600, "Invalid Mcp-Session-Id")]]
    end
    @cancellation_registry.cancel_all_for(clean_sid, reason: :session_terminated)
    # Wake any tool thread blocked on an elicitation reply for this
    # session (it returns `unavailable` → fail closed) and drop the
    # session's cached elicitation capability.
    @pending_elicitations.abort_all_for(clean_sid, :session_terminated)
    @elicitation_capabilities.forget(clean_sid)
    # Tear down any resource subscriptions and the listening stream
    # bound to this session so a terminated session leaves no LiveQuery
    # sockets behind.
    @subscription_manager&.detach_listener(clean_sid)
    # Drop the owner binding so the id can be reclaimed after explicit
    # termination (only here — not on mere stream close, so a reconnect
    # keeps its claim).
    @session_owners.forget(clean_sid)
    return [204, json_headers, [""]]
  end

  # 0d. GET listening stream — the MCP 2025-06-18 Streamable HTTP
  #     server→client channel that carries unsolicited
  #     `notifications/resources/updated`. Only when resource
  #     subscriptions are enabled, the client opted into SSE, and a
  #     valid Mcp-Session-Id is present. Authenticated via the same
  #     agent_factory as POST: the session id is a server-issued
  #     bearer capability (returned on initialize), so possession of
  #     it plus a valid agent gates the stream.
  if env["REQUEST_METHOD"] == "GET" && @subscription_manager &&
     env["HTTP_ACCEPT"].to_s.include?("text/event-stream")
    return serve_listening_stream(env)
  end

  # 1. Method check — only POST is accepted.
  unless env["REQUEST_METHOD"] == "POST"
    return [405,
            json_headers.merge("Allow" => "POST"),
            [json_rpc_error(-32_700, "method_not_allowed")]]
  end

  # 2. Content-type check — must be application/json (charset ignored).
  content_type = env["CONTENT_TYPE"].to_s.split(";").first.to_s.strip.downcase
  unless content_type == "application/json"
    return [415, json_headers, [json_rpc_error(-32_700, "Unsupported Media Type: Content-Type must be application/json")]]
  end

  # 2b. Origin allowlist. Browsers always send an `Origin` header
  #     on cross-origin POST; native clients typically don't.
  #     When configured, a non-empty `Origin` must match the
  #     allowlist or the request is rejected with 403.
  #     Missing/empty `Origin` is allowed regardless — native
  #     clients (curl, SDK-to-SDK) shouldn't be broken by a
  #     CSRF defense aimed at browsers.
  if @allowed_origins
    origin = env["HTTP_ORIGIN"].to_s.strip
    unless origin.empty? || origin_allowed?(origin)
      @logger&.warn("[Parse::Agent::MCPRackApp] Origin refused: #{origin.inspect}")
      return [403, json_headers, [json_rpc_error(-32_700, "Origin not allowed")]]
    end
  end

  # 2c. Required custom header (CSRF defense-in-depth). A header
  #     like `X-MCP-Client` cannot be set by a `<form>` CSRF and
  #     forces a CORS preflight on browser `fetch()`. When
  #     configured, the header must be present and (if a value
  #     was supplied to the constructor) match.
  if @required_custom_header
    header_env_key, expected_value = @required_custom_header
    actual = env[header_env_key].to_s
    if actual.empty? || (expected_value && actual != expected_value)
      return [403, json_headers, [json_rpc_error(-32_700, "Required custom header missing or invalid")]]
    end
  end

  # 3. Body size limit — read one byte beyond limit to detect oversized bodies
  #    without buffering the full stream.
  raw_body = env["rack.input"].read(@max_body_size + 1)
  if raw_body.bytesize > @max_body_size
    return [413, json_headers, [json_rpc_error(-32_700, "Payload Too Large: body exceeds #{@max_body_size} bytes")]]
  end

  # 4. JSON parse.
  begin
    body = JSON.parse(raw_body.empty? ? "{}" : raw_body, max_nesting: MAX_JSON_NESTING)
  rescue JSON::ParserError, JSON::NestingError
    return [400, json_headers, [json_rpc_error(-32_700, "Parse error: invalid JSON")]]
  end

  # 4b. NEW-MCP-6: refuse obviously-malformed JSON-RPC envelopes
  #     BEFORE invoking the agent_factory. The factory typically
  #     hits Parse Server (token validation, audit logging), so a
  #     barrage of empty `{}` or missing-method bodies otherwise
  #     amplifies into a Parse Server load problem. Empty-object
  #     and missing-method requests cannot possibly be valid
  #     JSON-RPC, so we shortcut to -32600 (Invalid Request).
  #     A method-less JSON-RPC RESPONSE ({jsonrpc,id,result|error}) is
  #     NOT malformed: it is the client's reply to a server-issued
  #     elicitation/create request. Let it through here; it is routed
  #     (session-bound) after the agent_factory resolves the session.
  unless (body.is_a?(Hash) && body["method"].is_a?(String) && !body["method"].empty?) ||
         elicitation_reply?(body)
    return [400, json_headers, [json_rpc_error(-32_600, "Invalid Request")]]
  end

  # 4c. MCP-Protocol-Version header validation (MCP 2025-06-18
  #     Streamable HTTP). The spec says:
  #       - The client MUST send `MCP-Protocol-Version: <ver>`
  #         on every request AFTER initialize.
  #       - If absent on a non-initialize request, the server
  #         SHOULD assume `2025-03-26` for backwards compatibility.
  #       - If present but not a version the server supports,
  #         the server MUST respond `400 Bad Request`.
  #     Initialize requests are exempt — initialize IS the
  #     negotiation, so the header is meaningless there.
  #     Cancellation notifications are also exempt because they
  #     may be sent by a client that has not (yet) completed
  #     initialize against this transport instance (e.g. a
  #     reconnecting client cancelling a pre-disconnect request).
  unless body["method"] == "initialize" ||
         body["method"] == "notifications/cancelled" ||
         elicitation_reply?(body)
    requested = env["HTTP_MCP_PROTOCOL_VERSION"]
    if requested.is_a?(String) && !requested.empty? &&
       !Parse::Agent::MCPDispatcher::SUPPORTED_PROTOCOL_VERSIONS.include?(requested)
      return [400, json_headers,
              [json_rpc_error(-32_600,
                              "Unsupported MCP-Protocol-Version: #{requested}",
                              id: body["id"])]]
    end
  end

  # 5. Agent factory — auth gate. Rescue Unauthorized first, then catch-all
  #    for unexpected factory errors.
  begin
    agent = @agent_factory.call(env)
  rescue Parse::Agent::Unauthorized => e
    @logger.warn("[Parse::Agent::MCPRackApp] Unauthorized: #{e.class.name}") if @logger
    return [401, json_headers, [unauthorized_body]]
  rescue StandardError => e
    if @logger
      @logger.warn("[Parse::Agent::MCPRackApp] Factory error: #{e.class.name}")
      @logger.warn(e.backtrace.join("\n")) if e.backtrace
    end
    return [500, json_headers, [json_rpc_error(-32_603, "Internal error")]]
  end

  # 5a-i. Surface the silent-ungated-writes footgun. A write/admin agent
  #     served over MCP with no approval tier configured runs every
  #     destructive tool without a human gate; warn once per process so
  #     the operator notices (mirrors the unrestricted-endpoints warning).
  if agent.respond_to?(:permissions) &&
     %i[write admin].include?(agent.permissions) &&
     Parse::Agent.require_approval_for.empty?
    Parse::Agent.warn_mcp_writes_unguarded!
  end

  # 5b. Thread the conversation correlation id through. Source
  #     header: the MCP 2025-06-18 Streamable HTTP spec-canonical
  #     `Mcp-Session-Id` (Rack env key `HTTP_MCP_SESSION_ID`).
  #
  #     Only fills it when the factory hasn't already assigned one
  #     — application code that needs to override the
  #     client-supplied id (e.g., bind to an internal session
  #     record) can do so in the factory and we don't stomp on it.
  #     The Parse::Agent#correlation_id= setter sanitizes the
  #     value; an invalid header is silently dropped.
  if agent && agent.respond_to?(:correlation_id=) &&
     agent.correlation_id.nil? &&
     (sid = env["HTTP_MCP_SESSION_ID"])
    agent.correlation_id = sid
  end

  # 5b-i. Server-assigned Mcp-Session-Id on initialize. Per MCP
  #     2025-06-18 Streamable HTTP, the server SHOULD assign a
  #     fresh session id during initialize when the client did not
  #     supply one, and return it on the response so the client
  #     can echo it on subsequent requests. Stateless-agent
  #     reality: the SDK does not maintain a server-side session
  #     store — the id is best-effort correlation only (used for
  #     audit-log threading and cancellation routing). We do not
  #     refuse subsequent requests carrying an "unknown" id.
  if body.is_a?(Hash) && body["method"] == "initialize" &&
     agent && agent.respond_to?(:correlation_id=) &&
     agent.correlation_id.nil?
    agent.correlation_id = SecureRandom.uuid
  end

  # 5b-ii. Capture the client's elicitation capability at initialize.
  #     The server reads (does not advertise) the client's
  #     `capabilities.elicitation`; the approval gate consults this
  #     per session before attempting a server→client prompt.
  if body.is_a?(Hash) && body["method"] == "initialize" &&
     agent.respond_to?(:correlation_id) && agent.correlation_id
    supported = !!(body.dig("params", "capabilities", "elicitation"))
    @elicitation_capabilities.set(agent.correlation_id, supported)
    # Authoritatively bind this session to the initializing principal so
    # only the same principal can later attach a listening stream for it
    # (owner-binding; see SessionOwnerRegistry).
    @session_owners.bind(agent.correlation_id, principal_fingerprint(agent, env))
  end

  # 5b-iii. Elicitation reply ingress. A method-less JSON-RPC
  #     RESPONSE is the client's answer to a server-issued
  #     elicitation/create. Route it into the pending registry,
  #     session-bound by the same `correlation_id` the cancellation
  #     path uses, so one session can never answer another's prompt.
  #     Failures (no correlation_id, no match) are silent 202 no-ops
  #     to avoid a probe oracle — exactly like notifications/cancelled.
  if elicitation_reply?(body)
    route_elicitation_reply(agent, body)
    return [202, json_headers, [""]]
  end

  # 5c. notifications/cancelled — special-cased BEFORE the dispatcher.
  #     A JSON-RPC notification has no `id`, expects no response
  #     body, and must trip the in-flight request whose
  #     `(correlation_id, request_id)` matches. We require the
  #     cancelling request to carry the same Mcp-Session-Id
  #     (sanitized into agent.correlation_id above) as the original
  #     request — otherwise an attacker who guesses sequential
  #     JSON-RPC ids could cancel arbitrary in-flight requests.
  #
  #     Failures (no correlation_id, no requestId, no match) are
  #     silent no-ops to avoid a probe oracle. The response is
  #     always 202 Accepted with an empty body.
  if body.is_a?(Hash) && body["method"] == "notifications/cancelled"
    request_id = body.dig("params", "requestId")
    if agent.respond_to?(:correlation_id) && agent.correlation_id && request_id
      @cancellation_registry.cancel(
        agent.correlation_id,
        request_id,
        reason: :notifications_cancelled,
      )
    end
    return [202, json_headers, [""]]
  end

  # 6. Branch on streaming preference. Transport-level errors (steps 1-5)
  #    always return plain JSON regardless of the Accept header.
  if @streaming && env["HTTP_ACCEPT"].to_s.include?("text/event-stream")
    serve_sse(body, agent)
  else
    serve_json(body, agent)
  end
end

#notify(session_id, method:, params: nil) ⇒ Boolean

Push a server-initiated JSON-RPC NOTIFICATION to a session's open listening stream. This is the public front door for application code to deliver unsolicited notifications/* events (the GET stream must be open for the session — open it client-side with a GET carrying Accept: text/event-stream + Mcp-Session-Id).

The envelope is built server-side as a notification — it never carries an id, which is what distinguishes it from the server-initiated request path (e.g. elicitation/create). A caller wanting an id-bearing request uses the internal subscription_manager.publish seam, not this method.

Parameters:

  • session_id (String)

    the target session (Mcp-Session-Id).

  • method (String)

    a non-empty JSON-RPC method, e.g. "notifications/custom".

  • params (Hash, nil) (defaults to: nil)

    optional params object.

Returns:

  • (Boolean)

    true if a listening stream received it; false when notifications are disabled or no stream is attached.

Raises:

  • (ArgumentError)

    when method is blank or not a String.



373
374
375
376
377
378
379
380
381
# File 'lib/parse/agent/mcp_rack_app.rb', line 373

def notify(session_id, method:, params: nil)
  unless method.is_a?(String) && !method.empty?
    raise ArgumentError, "notify: method must be a non-empty String"
  end
  return false unless @subscription_manager
  envelope = { "jsonrpc" => "2.0", "method" => method }
  envelope["params"] = params unless params.nil?
  !!@subscription_manager.publish(session_id, envelope)
end