Class: Parse::Agent::MCPRackApp

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/agent/mcp_rack_app.rb

Overview

Rack adapter that exposes Parse::Agent::MCPDispatcher as a mountable Rack endpoint. Downstream applications can mount this inside Sinatra, Rails, or any Rack-compatible router at an arbitrary path and behind their own authentication gate.

The adapter enforces the same transport-level invariants as MCPServer (method, content-type, body-size, and JSON-parse checks) and then delegates to Parse::Agent::MCPDispatcher.call for all protocol handling.

SSE Streaming (MCP progress notifications)

When constructed with ‘streaming: true`, requests that include `Accept: text/event-stream` receive an SSE response instead of a single JSON body. The server holds the connection open and emits `notifications/progress` events from two sources:

  1. Time-based heartbeats every ‘heartbeat_interval` seconds while the dispatcher runs (progress field = elapsed seconds).

  2. Tool-internal progress reported by the tool itself via ‘agent.report_progress(progress:, total:, message:)`. Works for both built-in tools and custom tools registered through `Parse::Agent::Tools.register`.

Heartbeats are automatically suppressed once a tool reports its own progress, so the ‘progressToken` carries a single coherent stream. A final `response` event carries the complete JSON-RPC response, after which the stream closes.

This lets LLM clients observe progress on long-running tool calls (such as aggregate pipelines) rather than timing out silently.

Streaming requires a Rack server that supports streaming response bodies (Puma, Falcon, Unicorn). WEBrick buffers the full body before writing, so SSE streaming has no effect on the standalone MCPServer — operators using MCPServer directly should leave ‘streaming: false` (the default).

To disable Nginx response buffering for SSE endpoints, set:

proxy_buffering off;

or rely on the ‘X-Accel-Buffering: no` header this class emits automatically on every SSE response.

When ‘streaming: false` (default), an `Accept: text/event-stream` request receives a plain JSON response — the adapter is permissive per the MCP spec, which does not require SSE support.

Examples:

Block form (most common)

app = Parse::Agent::MCPRackApp.new do |env|
  token = env["HTTP_AUTHORIZATION"].to_s.delete_prefix("Bearer ")
  agent = MyAuth.agent_for_token!(token)  # raises Unauthorized if invalid
  agent
end

Keyword argument form

factory = ->(env) { Parse::Agent.new(permissions: :readonly) }
app = Parse::Agent::MCPRackApp.new(agent_factory: factory)

With SSE streaming enabled

app = Parse::Agent::MCPRackApp.new(streaming: true) { |env| ... }

Mounted in Rails routes.rb

mount Parse::Agent::MCPRackApp.new { |env| ... }, at: "/mcp"

Defined Under Namespace

Classes: CancellationRegistry, SSEBody

Constant Summary collapse

DEFAULT_MAX_BODY_SIZE =

Maximum allowed request body size in bytes (matches MCPServer::MAX_BODY_SIZE).

1_048_576
MAX_JSON_NESTING =

JSON nesting depth limit (matches MCPServer::MAX_JSON_NESTING).

20
DEFAULT_HEARTBEAT_INTERVAL =

Default heartbeat interval in seconds when streaming is enabled.

2
JSON_CONTENT_TYPE =

Standard Content-Type for all JSON responses. Frozen template — call #json_headers to obtain a per-response mutable copy that composes with Rack middleware that decorates response headers (e.g. Sinatra’s xss_header / json_csrf / common_logger).

{ "Content-Type" => "application/json" }.freeze
SSE_HEADERS =

SSE response headers. X-Accel-Buffering disables Nginx proxy buffering. Frozen template — call #sse_headers to obtain a per-response copy.

{
  "Content-Type"      => "text/event-stream",
  "Cache-Control"     => "no-cache",
  "Connection"        => "keep-alive",
  "X-Accel-Buffering" => "no",
}.freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(agent_factory: nil, max_body_size: DEFAULT_MAX_BODY_SIZE, logger: nil, streaming: false, heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL, max_concurrent_dispatchers: nil, pre_auth_rate_limiter: nil, allowed_origins: nil, require_custom_header: nil, &block) ⇒ MCPRackApp

Returns a new instance of MCPRackApp.

Parameters:

  • agent_factory (Proc, nil) (defaults to: nil)

    callable invoked with the Rack env on every request. Must return a Parse::Agent or raise Parse::Agent::Unauthorized. Mutually exclusive with a block.

  • max_body_size (Integer) (defaults to: DEFAULT_MAX_BODY_SIZE)

    reject bodies larger than this many bytes. Defaults to DEFAULT_MAX_BODY_SIZE.

  • logger (#warn, nil) (defaults to: nil)

    optional logger. When set, auth failures are warned at class-name level, and internal errors include a backtrace.

  • streaming (Boolean) (defaults to: false)

    enable SSE streaming for clients that send ‘Accept: text/event-stream`. Defaults to false for backward compatibility. Has no effect on WEBrick-backed deployments (see class documentation).

  • heartbeat_interval (Numeric) (defaults to: DEFAULT_HEARTBEAT_INTERVAL)

    seconds between progress heartbeat events when streaming is active. Defaults to DEFAULT_HEARTBEAT_INTERVAL. Ignored when ‘streaming: false`.

  • max_concurrent_dispatchers (Integer, nil) (defaults to: nil)

    when set, limits the number of concurrently active dispatcher threads across all SSE connections served by this app instance. When the limit is reached a new SSE request immediately receives a 503 JSON-RPC error envelope (‘-32000` “server busy”) rather than spawning another dispatcher. Defaults to `nil` (unlimited). Use `active_dispatcher_count` to monitor current concurrency from operator tooling.

  • pre_auth_rate_limiter (#check!, nil) (defaults to: nil)

    optional rate limiter consulted at the top of every request, BEFORE the agent_factory is invoked. Closes the factory-amplification DoS where each malformed request burns a Parse Server round-trip (factories typically validate session tokens by calling out). Must respond to ‘#check!` and raise an exception responding to `#retry_after` (such as `Parse::Agent::RateLimiter::RateLimitExceeded`) when exhausted. Defaults to `nil` (no pre-auth limiter). On exhaustion the request is rejected with HTTP 429 and a `Retry-After` header.

  • allowed_origins (Array<String>, nil) (defaults to: nil)

    when set, the ‘Origin` request header must match one of these entries (case-insensitive, exact host match — wildcard via leading `.` matches subdomains). `nil` (default) skips the check. Browsers always send `Origin` on cross-origin POST; native clients (curl, ruby HTTP client, SDK-to-SDK) typically don’t, and an absent ‘Origin` is treated as allowed regardless of this setting. The default loopback bind makes this check optional in development; operators who bind MCP to a routable interface should configure it.

  • require_custom_header (String, nil) (defaults to: nil)

    when set (e.g. ‘“X-MCP-Client”`), requests must carry that header with any non-empty value. Custom headers can’t be set by a ‘<form>` CSRF and force a CORS preflight on browser `fetch()`, so this gate closes the browser-driven attack surface entirely. Pair with `allowed_origins` for defense in depth.

Raises:

  • (ArgumentError)

    if both or neither of agent_factory/block are given.



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/parse/agent/mcp_rack_app.rb', line 192

def initialize(agent_factory: nil, max_body_size: DEFAULT_MAX_BODY_SIZE,
               logger: nil, streaming: false,
               heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
               max_concurrent_dispatchers: nil,
               pre_auth_rate_limiter: nil,
               allowed_origins: nil,
               require_custom_header: nil, &block)
  if agent_factory && block
    raise ArgumentError, "Provide agent_factory: OR a block, not both"
  end
  unless agent_factory || block
    raise ArgumentError, "Either agent_factory: keyword or a block is required"
  end
  if pre_auth_rate_limiter && !pre_auth_rate_limiter.respond_to?(:check!)
    raise ArgumentError, "pre_auth_rate_limiter must respond to #check!"
  end

  @agent_factory              = agent_factory || block
  @max_body_size              = max_body_size
  @logger                     = logger
  @streaming                  = streaming
  @heartbeat_interval         = heartbeat_interval
  @max_concurrent_dispatchers = max_concurrent_dispatchers
  @pre_auth_rate_limiter      = pre_auth_rate_limiter
  @allowed_origins            = normalize_allowed_origins(allowed_origins)
  @required_custom_header     = normalize_required_custom_header(require_custom_header)
  # Per-app registry of in-flight cancellable requests. Keyed by
  # [correlation_id, request_id]. A `notifications/cancelled` POST
  # whose `params.requestId` matches an entry trips the registered
  # CancellationToken. Scoped per-instance, not per-process: this
  # registry does not span multiple MCPRackApp mount points within
  # a process, nor multiple processes in a clustered deployment.
  @cancellation_registry      = CancellationRegistry.new

  # Warn operators who enable streaming without a concurrency cap.
  # An unbounded SSE endpoint with orphaned dispatcher threads is
  # a practical DoS surface — a slow or hostile client opening
  # connections faster than tools complete can exhaust the host's
  # thread pool and downstream Parse connection pool. Leaving the
  # default as `nil` (unlimited) preserves backward compatibility,
  # but we tell the operator once at construction.
  if streaming && @max_concurrent_dispatchers.nil?
    line = "[Parse::Agent::MCPRackApp] streaming: true with max_concurrent_dispatchers: nil (unlimited). " \
           "Set a finite cap (e.g. 100, or 2x your Puma max_threads) to bound the orphan-thread DoS surface. " \
           "See docs/mcp_guide.md for sizing guidance."
    if @logger
      @logger.warn(line)
    else
      warn line
    end
  end
end

Class Method Details

.active_dispatcher_countObject

Returns the number of currently live dispatcher threads spawned by any SSEBody across all MCPRackApp instances in this process. Threads are counted by the ‘:parse_mcp_dispatcher` thread-local tag set when each dispatcher_thread is started. Use this for operator dashboards or health checks; do NOT use it to make flow-control decisions at runtime (use the `max_concurrent_dispatchers:` constructor option for that).



251
252
253
# File 'lib/parse/agent/mcp_rack_app.rb', line 251

def self.active_dispatcher_count
  Thread.list.count { |t| t[:parse_mcp_dispatcher] }
end

.strip_underscore_smuggled_headers!(env) ⇒ Hash

Drop env keys that would have come from underscore-form HTTP header names. The Rack-spec-compliant interpretation of HTTP headers maps ‘X-MCP-API-Key` and `X_MCP_API_KEY` to the same env key (`HTTP_X_MCP_API_KEY`); a misbehaving upstream server that forwards the underscore-form lets an attacker overwrite trusted reverse-proxy- injected headers.

This helper is invoked automatically at the top of #call, so any MCPRackApp mounted in a Rack 3+ pipeline (which exposes the original header list via ‘rack.headers`) gets defense-in-depth scrubbing without operator opt-in. On Rack 2 / pre-3 servers `rack.headers` is not set and the helper is a no-op; operators on those stacks must configure their upstream (e.g. Nginx `underscores_in_headers off`) OR mount this helper as an explicit middleware.

The standalone ‘MCPServer` rewrites its own `build_rack_env` to drop underscore-form names before they reach this app, so the standalone path is covered regardless of Rack version.

Examples:

Explicit middleware (Rack 2 / pre-3 deployments)

class StripSmuggledHeaders
  def initialize(app); @app = app; end
  def call(env)
    Parse::Agent::MCPRackApp.strip_underscore_smuggled_headers!(env)
    @app.call(env)
  end
end

Parameters:

  • env (Hash)

    the Rack env, mutated in place

Returns:

  • (Hash)

    the same env, for chaining



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/parse/agent/mcp_rack_app.rb', line 129

def self.strip_underscore_smuggled_headers!(env)
  # Rack 3+ preserves the original header list in env["rack.headers"]
  # (a Rack::Headers instance or Hash). When present, we can identify
  # which env keys came from an underscore-form header and delete
  # them, even if a dashed-form sibling arrived too.
  if env["rack.headers"].respond_to?(:each)
    suspect = []
    env["rack.headers"].each do |name, _|
      suspect << name if name.is_a?(String) && name.include?("_")
    end
    suspect.each do |name|
      env.delete("HTTP_#{name.upcase.tr("-", "_")}")
    end
  end
  env
end

Instance Method Details

#call(env) ⇒ Array(Integer, Hash, #each)

Rack interface.

Parameters:

  • env (Hash)

    Rack environment

Returns:

  • (Array(Integer, Hash, #each))

    Rack triple



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
# File 'lib/parse/agent/mcp_rack_app.rb', line 259

def call(env)
  # 0. Defense-in-depth: strip underscore-form HTTP headers from env
  #    before any subsequent lookup reads HTTP_X_MCP_API_KEY / etc.
  #    No-op on Rack < 3 (where env["rack.headers"] is absent); on
  #    Rack 3+ this removes any HTTP_* env key whose original header
  #    name contained an underscore. Closes the smuggling path where
  #    a hostile client sends `X_MCP_API_Key: ...` alongside a
  #    trusted reverse-proxy-injected `X-MCP-API-Key: ...` and the
  #    underscored form collapses-and-overwrites the trusted slot.
  self.class.strip_underscore_smuggled_headers!(env)

  # 0b. NEW-MCP-6: pre-auth rate limit. Runs BEFORE the agent_factory
  #     so a malformed body / missing key / empty `{}` cannot force
  #     the operator-supplied factory to round-trip to Parse Server
  #     on every request. Off by default (constructor kwarg).
  if @pre_auth_rate_limiter
    begin
      @pre_auth_rate_limiter.check!
    rescue StandardError => e
      retry_after = e.respond_to?(:retry_after) ? e.retry_after : nil
      headers = json_headers.dup
      headers["Retry-After"] = retry_after.ceil.to_s if retry_after && retry_after > 0
      return [429, headers, [json_rpc_error(-32_000, "Too Many Requests")]]
    end
  end

  # 1. Method check — only POST is accepted.
  unless env["REQUEST_METHOD"] == "POST"
    return [405,
            json_headers.merge("Allow" => "POST"),
            [json_rpc_error(-32_700, "method_not_allowed")]]
  end

  # 2. Content-type check — must be application/json (charset ignored).
  content_type = env["CONTENT_TYPE"].to_s.split(";").first.to_s.strip.downcase
  unless content_type == "application/json"
    return [415, json_headers, [json_rpc_error(-32_700, "Unsupported Media Type: Content-Type must be application/json")]]
  end

  # 2b. Origin allowlist. Browsers always send an `Origin` header
  #     on cross-origin POST; native clients typically don't.
  #     When configured, a non-empty `Origin` must match the
  #     allowlist or the request is rejected with 403.
  #     Missing/empty `Origin` is allowed regardless — native
  #     clients (curl, SDK-to-SDK) shouldn't be broken by a
  #     CSRF defense aimed at browsers.
  if @allowed_origins
    origin = env["HTTP_ORIGIN"].to_s.strip
    unless origin.empty? || origin_allowed?(origin)
      @logger&.warn("[Parse::Agent::MCPRackApp] Origin refused: #{origin.inspect}")
      return [403, json_headers, [json_rpc_error(-32_700, "Origin not allowed")]]
    end
  end

  # 2c. Required custom header (CSRF defense-in-depth). A header
  #     like `X-MCP-Client` cannot be set by a `<form>` CSRF and
  #     forces a CORS preflight on browser `fetch()`. When
  #     configured, the header must be present and (if a value
  #     was supplied to the constructor) match.
  if @required_custom_header
    header_env_key, expected_value = @required_custom_header
    actual = env[header_env_key].to_s
    if actual.empty? || (expected_value && actual != expected_value)
      return [403, json_headers, [json_rpc_error(-32_700, "Required custom header missing or invalid")]]
    end
  end

  # 3. Body size limit — read one byte beyond limit to detect oversized bodies
  #    without buffering the full stream.
  raw_body = env["rack.input"].read(@max_body_size + 1)
  if raw_body.bytesize > @max_body_size
    return [413, json_headers, [json_rpc_error(-32_700, "Payload Too Large: body exceeds #{@max_body_size} bytes")]]
  end

  # 4. JSON parse.
  begin
    body = JSON.parse(raw_body.empty? ? "{}" : raw_body, max_nesting: MAX_JSON_NESTING)
  rescue JSON::ParserError, JSON::NestingError
    return [400, json_headers, [json_rpc_error(-32_700, "Parse error: invalid JSON")]]
  end

  # 4b. NEW-MCP-6: refuse obviously-malformed JSON-RPC envelopes
  #     BEFORE invoking the agent_factory. The factory typically
  #     hits Parse Server (token validation, audit logging), so a
  #     barrage of empty `{}` or missing-method bodies otherwise
  #     amplifies into a Parse Server load problem. Empty-object
  #     and missing-method requests cannot possibly be valid
  #     JSON-RPC, so we shortcut to -32600 (Invalid Request).
  unless body.is_a?(Hash) && body["method"].is_a?(String) && !body["method"].empty?
    return [400, json_headers, [json_rpc_error(-32_600, "Invalid Request")]]
  end

  # 5. Agent factory — auth gate. Rescue Unauthorized first, then catch-all
  #    for unexpected factory errors.
  begin
    agent = @agent_factory.call(env)
  rescue Parse::Agent::Unauthorized => e
    @logger.warn("[Parse::Agent::MCPRackApp] Unauthorized: #{e.class.name}") if @logger
    return [401, json_headers, [unauthorized_body]]
  rescue StandardError => e
    if @logger
      @logger.warn("[Parse::Agent::MCPRackApp] Factory error: #{e.class.name}")
      @logger.warn(e.backtrace.join("\n")) if e.backtrace
    end
    return [500, json_headers, [json_rpc_error(-32_603, "Internal error")]]
  end

  # 5b. Thread the conversation correlation id through. Source:
  #     X-MCP-Session-Id header. Only fills it when the factory
  #     hasn't already assigned one — application code that needs to
  #     override the client-supplied id (e.g., bind to an internal
  #     session record) can do so in the factory and we don't
  #     stomp on it. The Parse::Agent#correlation_id= setter
  #     sanitizes the value; an invalid header is silently dropped.
  if agent && agent.respond_to?(:correlation_id=) &&
     agent.correlation_id.nil? &&
     (sid = env["HTTP_X_MCP_SESSION_ID"])
    agent.correlation_id = sid
  end

  # 5c. notifications/cancelled — special-cased BEFORE the dispatcher.
  #     A JSON-RPC notification has no `id`, expects no response
  #     body, and must trip the in-flight request whose
  #     `(correlation_id, request_id)` matches. We require the
  #     cancelling request to carry the same X-MCP-Session-Id
  #     (sanitized into agent.correlation_id above) as the original
  #     request — otherwise an attacker who guesses sequential
  #     JSON-RPC ids could cancel arbitrary in-flight requests.
  #
  #     Failures (no correlation_id, no requestId, no match) are
  #     silent no-ops to avoid a probe oracle. The response is
  #     always 202 Accepted with an empty body.
  if body.is_a?(Hash) && body["method"] == "notifications/cancelled"
    request_id = body.dig("params", "requestId")
    if agent.respond_to?(:correlation_id) && agent.correlation_id && request_id
      @cancellation_registry.cancel(
        agent.correlation_id,
        request_id,
        reason: :notifications_cancelled,
      )
    end
    return [202, json_headers, [""]]
  end

  # 6. Branch on streaming preference. Transport-level errors (steps 1-5)
  #    always return plain JSON regardless of the Accept header.
  if @streaming && env["HTTP_ACCEPT"].to_s.include?("text/event-stream")
    serve_sse(body, agent)
  else
    serve_json(body, agent)
  end
end