Class: Parse::Agent::MCPRackApp
- Inherits:
-
Object
- Object
- Parse::Agent::MCPRackApp
- Defined in:
- lib/parse/agent/mcp_rack_app.rb
Overview
Rack adapter that exposes Parse::Agent::MCPDispatcher as a mountable Rack endpoint. Downstream applications can mount this inside Sinatra, Rails, or any Rack-compatible router at an arbitrary path and behind their own authentication gate.
The adapter enforces the same transport-level invariants as MCPServer (method, content-type, body-size, and JSON-parse checks) and then delegates to Parse::Agent::MCPDispatcher.call for all protocol handling.
SSE Streaming (MCP progress notifications)
When constructed with ‘streaming: true`, requests that include `Accept: text/event-stream` receive an SSE response instead of a single JSON body. The server holds the connection open and emits `notifications/progress` events from two sources:
-
Time-based heartbeats every ‘heartbeat_interval` seconds while the dispatcher runs (progress field = elapsed seconds).
-
Tool-internal progress reported by the tool itself via ‘agent.report_progress(progress:, total:, message:)`. Works for both built-in tools and custom tools registered through `Parse::Agent::Tools.register`.
Heartbeats are automatically suppressed once a tool reports its own progress, so the ‘progressToken` carries a single coherent stream. A final `response` event carries the complete JSON-RPC response, after which the stream closes.
This lets LLM clients observe progress on long-running tool calls (such as aggregate pipelines) rather than timing out silently.
Streaming requires a Rack server that supports streaming response bodies (Puma, Falcon, Unicorn). WEBrick buffers the full body before writing, so SSE streaming has no effect on the standalone MCPServer — operators using MCPServer directly should leave ‘streaming: false` (the default).
To disable Nginx response buffering for SSE endpoints, set:
proxy_buffering off;
or rely on the ‘X-Accel-Buffering: no` header this class emits automatically on every SSE response.
When ‘streaming: false` (default), an `Accept: text/event-stream` request receives a plain JSON response — the adapter is permissive per the MCP spec, which does not require SSE support.
Defined Under Namespace
Classes: CancellationRegistry, SSEBody
Constant Summary collapse
- DEFAULT_MAX_BODY_SIZE =
Maximum allowed request body size in bytes (matches MCPServer::MAX_BODY_SIZE).
1_048_576- MAX_JSON_NESTING =
JSON nesting depth limit (matches MCPServer::MAX_JSON_NESTING).
20- DEFAULT_HEARTBEAT_INTERVAL =
Default heartbeat interval in seconds when streaming is enabled.
2- JSON_CONTENT_TYPE =
Standard Content-Type for all JSON responses. Frozen template — call #json_headers to obtain a per-response mutable copy that composes with Rack middleware that decorates response headers (e.g. Sinatra’s xss_header / json_csrf / common_logger).
{ "Content-Type" => "application/json" }.freeze
- SSE_HEADERS =
SSE response headers. X-Accel-Buffering disables Nginx proxy buffering. Frozen template — call #sse_headers to obtain a per-response copy.
{ "Content-Type" => "text/event-stream", "Cache-Control" => "no-cache", "Connection" => "keep-alive", "X-Accel-Buffering" => "no", }.freeze
Class Method Summary collapse
-
.active_dispatcher_count ⇒ Object
Returns the number of currently live dispatcher threads spawned by any SSEBody across all MCPRackApp instances in this process.
-
.strip_underscore_smuggled_headers!(env) ⇒ Hash
Drop env keys that would have come from underscore-form HTTP header names.
Instance Method Summary collapse
-
#call(env) ⇒ Array(Integer, Hash, #each)
Rack interface.
-
#initialize(agent_factory: nil, max_body_size: DEFAULT_MAX_BODY_SIZE, logger: nil, streaming: false, heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL, max_concurrent_dispatchers: nil, pre_auth_rate_limiter: nil, allowed_origins: nil, require_custom_header: nil, &block) ⇒ MCPRackApp
constructor
A new instance of MCPRackApp.
Constructor Details
#initialize(agent_factory: nil, max_body_size: DEFAULT_MAX_BODY_SIZE, logger: nil, streaming: false, heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL, max_concurrent_dispatchers: nil, pre_auth_rate_limiter: nil, allowed_origins: nil, require_custom_header: nil, &block) ⇒ MCPRackApp
Returns a new instance of MCPRackApp.
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 192 def initialize(agent_factory: nil, max_body_size: DEFAULT_MAX_BODY_SIZE, logger: nil, streaming: false, heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL, max_concurrent_dispatchers: nil, pre_auth_rate_limiter: nil, allowed_origins: nil, require_custom_header: nil, &block) if agent_factory && block raise ArgumentError, "Provide agent_factory: OR a block, not both" end unless agent_factory || block raise ArgumentError, "Either agent_factory: keyword or a block is required" end if pre_auth_rate_limiter && !pre_auth_rate_limiter.respond_to?(:check!) raise ArgumentError, "pre_auth_rate_limiter must respond to #check!" end @agent_factory = agent_factory || block @max_body_size = max_body_size @logger = logger @streaming = streaming @heartbeat_interval = heartbeat_interval @max_concurrent_dispatchers = max_concurrent_dispatchers @pre_auth_rate_limiter = pre_auth_rate_limiter @allowed_origins = normalize_allowed_origins(allowed_origins) @required_custom_header = normalize_required_custom_header(require_custom_header) # Per-app registry of in-flight cancellable requests. Keyed by # [correlation_id, request_id]. A `notifications/cancelled` POST # whose `params.requestId` matches an entry trips the registered # CancellationToken. Scoped per-instance, not per-process: this # registry does not span multiple MCPRackApp mount points within # a process, nor multiple processes in a clustered deployment. @cancellation_registry = CancellationRegistry.new # Warn operators who enable streaming without a concurrency cap. # An unbounded SSE endpoint with orphaned dispatcher threads is # a practical DoS surface — a slow or hostile client opening # connections faster than tools complete can exhaust the host's # thread pool and downstream Parse connection pool. Leaving the # default as `nil` (unlimited) preserves backward compatibility, # but we tell the operator once at construction. if streaming && @max_concurrent_dispatchers.nil? line = "[Parse::Agent::MCPRackApp] streaming: true with max_concurrent_dispatchers: nil (unlimited). " \ "Set a finite cap (e.g. 100, or 2x your Puma max_threads) to bound the orphan-thread DoS surface. " \ "See docs/mcp_guide.md for sizing guidance." if @logger @logger.warn(line) else warn line end end end |
Class Method Details
.active_dispatcher_count ⇒ Object
Returns the number of currently live dispatcher threads spawned by any SSEBody across all MCPRackApp instances in this process. Threads are counted by the ‘:parse_mcp_dispatcher` thread-local tag set when each dispatcher_thread is started. Use this for operator dashboards or health checks; do NOT use it to make flow-control decisions at runtime (use the `max_concurrent_dispatchers:` constructor option for that).
251 252 253 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 251 def self.active_dispatcher_count Thread.list.count { |t| t[:parse_mcp_dispatcher] } end |
.strip_underscore_smuggled_headers!(env) ⇒ Hash
Drop env keys that would have come from underscore-form HTTP header names. The Rack-spec-compliant interpretation of HTTP headers maps ‘X-MCP-API-Key` and `X_MCP_API_KEY` to the same env key (`HTTP_X_MCP_API_KEY`); a misbehaving upstream server that forwards the underscore-form lets an attacker overwrite trusted reverse-proxy- injected headers.
This helper is invoked automatically at the top of #call, so any MCPRackApp mounted in a Rack 3+ pipeline (which exposes the original header list via ‘rack.headers`) gets defense-in-depth scrubbing without operator opt-in. On Rack 2 / pre-3 servers `rack.headers` is not set and the helper is a no-op; operators on those stacks must configure their upstream (e.g. Nginx `underscores_in_headers off`) OR mount this helper as an explicit middleware.
The standalone ‘MCPServer` rewrites its own `build_rack_env` to drop underscore-form names before they reach this app, so the standalone path is covered regardless of Rack version.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 129 def self.strip_underscore_smuggled_headers!(env) # Rack 3+ preserves the original header list in env["rack.headers"] # (a Rack::Headers instance or Hash). When present, we can identify # which env keys came from an underscore-form header and delete # them, even if a dashed-form sibling arrived too. if env["rack.headers"].respond_to?(:each) suspect = [] env["rack.headers"].each do |name, _| suspect << name if name.is_a?(String) && name.include?("_") end suspect.each do |name| env.delete("HTTP_#{name.upcase.tr("-", "_")}") end end env end |
Instance Method Details
#call(env) ⇒ Array(Integer, Hash, #each)
Rack interface.
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 259 def call(env) # 0. Defense-in-depth: strip underscore-form HTTP headers from env # before any subsequent lookup reads HTTP_X_MCP_API_KEY / etc. # No-op on Rack < 3 (where env["rack.headers"] is absent); on # Rack 3+ this removes any HTTP_* env key whose original header # name contained an underscore. Closes the smuggling path where # a hostile client sends `X_MCP_API_Key: ...` alongside a # trusted reverse-proxy-injected `X-MCP-API-Key: ...` and the # underscored form collapses-and-overwrites the trusted slot. self.class.strip_underscore_smuggled_headers!(env) # 0b. NEW-MCP-6: pre-auth rate limit. Runs BEFORE the agent_factory # so a malformed body / missing key / empty `{}` cannot force # the operator-supplied factory to round-trip to Parse Server # on every request. Off by default (constructor kwarg). if @pre_auth_rate_limiter begin @pre_auth_rate_limiter.check! rescue StandardError => e retry_after = e.respond_to?(:retry_after) ? e.retry_after : nil headers = json_headers.dup headers["Retry-After"] = retry_after.ceil.to_s if retry_after && retry_after > 0 return [429, headers, [json_rpc_error(-32_000, "Too Many Requests")]] end end # 1. Method check — only POST is accepted. unless env["REQUEST_METHOD"] == "POST" return [405, json_headers.merge("Allow" => "POST"), [json_rpc_error(-32_700, "method_not_allowed")]] end # 2. Content-type check — must be application/json (charset ignored). content_type = env["CONTENT_TYPE"].to_s.split(";").first.to_s.strip.downcase unless content_type == "application/json" return [415, json_headers, [json_rpc_error(-32_700, "Unsupported Media Type: Content-Type must be application/json")]] end # 2b. Origin allowlist. Browsers always send an `Origin` header # on cross-origin POST; native clients typically don't. # When configured, a non-empty `Origin` must match the # allowlist or the request is rejected with 403. # Missing/empty `Origin` is allowed regardless — native # clients (curl, SDK-to-SDK) shouldn't be broken by a # CSRF defense aimed at browsers. if @allowed_origins origin = env["HTTP_ORIGIN"].to_s.strip unless origin.empty? || origin_allowed?(origin) @logger&.warn("[Parse::Agent::MCPRackApp] Origin refused: #{origin.inspect}") return [403, json_headers, [json_rpc_error(-32_700, "Origin not allowed")]] end end # 2c. Required custom header (CSRF defense-in-depth). A header # like `X-MCP-Client` cannot be set by a `<form>` CSRF and # forces a CORS preflight on browser `fetch()`. When # configured, the header must be present and (if a value # was supplied to the constructor) match. if @required_custom_header header_env_key, expected_value = @required_custom_header actual = env[header_env_key].to_s if actual.empty? || (expected_value && actual != expected_value) return [403, json_headers, [json_rpc_error(-32_700, "Required custom header missing or invalid")]] end end # 3. Body size limit — read one byte beyond limit to detect oversized bodies # without buffering the full stream. raw_body = env["rack.input"].read(@max_body_size + 1) if raw_body.bytesize > @max_body_size return [413, json_headers, [json_rpc_error(-32_700, "Payload Too Large: body exceeds #{@max_body_size} bytes")]] end # 4. JSON parse. begin body = JSON.parse(raw_body.empty? ? "{}" : raw_body, max_nesting: MAX_JSON_NESTING) rescue JSON::ParserError, JSON::NestingError return [400, json_headers, [json_rpc_error(-32_700, "Parse error: invalid JSON")]] end # 4b. NEW-MCP-6: refuse obviously-malformed JSON-RPC envelopes # BEFORE invoking the agent_factory. The factory typically # hits Parse Server (token validation, audit logging), so a # barrage of empty `{}` or missing-method bodies otherwise # amplifies into a Parse Server load problem. Empty-object # and missing-method requests cannot possibly be valid # JSON-RPC, so we shortcut to -32600 (Invalid Request). unless body.is_a?(Hash) && body["method"].is_a?(String) && !body["method"].empty? return [400, json_headers, [json_rpc_error(-32_600, "Invalid Request")]] end # 5. Agent factory — auth gate. Rescue Unauthorized first, then catch-all # for unexpected factory errors. begin agent = @agent_factory.call(env) rescue Parse::Agent::Unauthorized => e @logger.warn("[Parse::Agent::MCPRackApp] Unauthorized: #{e.class.name}") if @logger return [401, json_headers, []] rescue StandardError => e if @logger @logger.warn("[Parse::Agent::MCPRackApp] Factory error: #{e.class.name}") @logger.warn(e.backtrace.join("\n")) if e.backtrace end return [500, json_headers, [json_rpc_error(-32_603, "Internal error")]] end # 5b. Thread the conversation correlation id through. Source: # X-MCP-Session-Id header. Only fills it when the factory # hasn't already assigned one — application code that needs to # override the client-supplied id (e.g., bind to an internal # session record) can do so in the factory and we don't # stomp on it. The Parse::Agent#correlation_id= setter # sanitizes the value; an invalid header is silently dropped. if agent && agent.respond_to?(:correlation_id=) && agent.correlation_id.nil? && (sid = env["HTTP_X_MCP_SESSION_ID"]) agent.correlation_id = sid end # 5c. notifications/cancelled — special-cased BEFORE the dispatcher. # A JSON-RPC notification has no `id`, expects no response # body, and must trip the in-flight request whose # `(correlation_id, request_id)` matches. We require the # cancelling request to carry the same X-MCP-Session-Id # (sanitized into agent.correlation_id above) as the original # request — otherwise an attacker who guesses sequential # JSON-RPC ids could cancel arbitrary in-flight requests. # # Failures (no correlation_id, no requestId, no match) are # silent no-ops to avoid a probe oracle. The response is # always 202 Accepted with an empty body. if body.is_a?(Hash) && body["method"] == "notifications/cancelled" request_id = body.dig("params", "requestId") if agent.respond_to?(:correlation_id) && agent.correlation_id && request_id @cancellation_registry.cancel( agent.correlation_id, request_id, reason: :notifications_cancelled, ) end return [202, json_headers, [""]] end # 6. Branch on streaming preference. Transport-level errors (steps 1-5) # always return plain JSON regardless of the Accept header. if @streaming && env["HTTP_ACCEPT"].to_s.include?("text/event-stream") serve_sse(body, agent) else serve_json(body, agent) end end |