Class: Parse::Agent::MCPRackApp::SSEBody Private

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/agent/mcp_rack_app.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Rack body object that emits MCP progress notifications over SSE.

#each is the only public interface (besides #close). It is driven by the Rack server on whatever thread/fiber handles response writing. The dispatcher call and heartbeat timer both run on a dedicated worker thread so they do not block the calling fiber.

== Two sources of progress events

SSEBody emits notifications/progress events from two sources:

  1. Time-based heartbeats. The worker thread emits a heartbeat every @interval seconds while the dispatcher is running. The progress field is elapsed seconds; total is omitted. The heartbeat uses a dedicated server-generated progressToken distinct from any client-supplied token so the elapsed-seconds scale never appears alongside tool-reported work units on the same token (the MCP spec requires per-token monotonicity).

  2. Tool-internal progress. Tools call agent.report_progress(...) which invokes the callback exposed by #progress_callback. The callback pushes an event using the client-supplied or server-generated progressToken with the tool-supplied progress, optional total, and optional message.

Once a tool starts reporting its own progress, the heartbeat loop suppresses further time-based events to reduce stream noise — the tool's reports already carry liveness signal. When the tool never calls report_progress, heartbeats continue firing for the lifetime of the dispatcher.

Wire format for each SSE event (note: trailing blank line is required by the SSE spec):

event: progress\n data: \n \n

Constant Summary collapse

DONE =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Sentinel pushed to the queue when the worker is done.

:__sse_done__

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(progress_token, req_id, interval, logger, cancellation_token: nil, on_close: nil, heartbeat_waiter: nil, &dispatcher_blk) ⇒ SSEBody

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of SSEBody.

Parameters:

  • progress_token (String)

    MCP progressToken value.

  • req_id (Object)

    JSON-RPC request id (may be nil).

  • interval (Numeric)

    heartbeat period in seconds.

  • logger (#warn, nil)

    optional logger.

  • cancellation_token (Parse::Agent::CancellationToken, nil) (defaults to: nil)

    token tripped by #close (client disconnect) and by notifications/cancelled lookups. Tools cooperate by checking agent.cancelled?.

  • on_close (Proc, nil) (defaults to: nil)

    callback invoked from #close after the worker has been terminated. Used by MCPRackApp to deregister the cancellation token from the per-app registry.

  • dispatcher_blk (Proc)

    called with one argument (the #progress_callback Proc); must return the same { status:, body: } hash that MCPDispatcher.call returns.

  • heartbeat_waiter (Proc, nil) (defaults to: nil)

    test hook. Called as waiter.call(dispatcher_thread, interval) once per heartbeat iteration; must block until either the dispatcher finishes or interval elapses. Default delegates to dispatcher_thread.join(interval). Tests inject a queue-driven waiter so heartbeat cadence is deterministic and not subject to OS scheduler jitter.



1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
# File 'lib/parse/agent/mcp_rack_app.rb', line 1219

def initialize(progress_token, req_id, interval, logger,
               cancellation_token: nil, on_close: nil,
               heartbeat_waiter: nil, &dispatcher_blk)
  @progress_token         = progress_token
  # Heartbeats use a dedicated server-generated progressToken so
  # the elapsed-seconds scale of heartbeats never appears on the
  # same MCP progressToken as work-unit values reported by tools.
  # The MCP spec requires `progress` to increase monotonically
  # per progressToken; mixing the two scales would violate it
  # at the boundary where a tool first reports.
  @heartbeat_token        = "parse-stack:heartbeat:#{SecureRandom.uuid}"
  @req_id                 = req_id
  @interval               = interval
  @logger                 = logger
  @dispatcher_blk         = dispatcher_blk
  @cancellation_token     = cancellation_token
  @on_close               = on_close
  @heartbeat_waiter       = heartbeat_waiter ||
                            Thread.current[:parse_mcp_sse_heartbeat_waiter] ||
                            ->(t, i) { t.join(i) }
  @queue                  = Queue.new
  @worker                 = nil
  # The dispatcher thread spawned inside @worker. Published under
  # @close_mutex once started so {#close} can snapshot its liveness for
  # the abandonment signal. Never force-killed (see #close).
  @dispatcher_thread      = nil
  # Flipped to true by #each when the DONE sentinel is consumed.
  # #close uses this to decide whether to trip the cancellation
  # token (false = client disconnect) or skip the trip (true =
  # the request finished on its own). Reads and writes happen
  # under @close_mutex below.
  @completed_normally     = false
  # Volatile flag flipped by the progress_callback the first time a
  # tool reports. Heartbeats now use a separate progressToken so
  # the flag is no longer a spec-correctness gate, but we keep
  # it as a small bandwidth optimization — once a tool is
  # actively reporting, time-based heartbeats are noise.
  @tool_progress_reported = false
  @progress_callback      = build_progress_callback
  # Deregistration callbacks for the Tools/Prompts subscribe
  # bindings. Set when the worker starts (so a request that is
  # never driven via #each does not register a stale entry) and
  # cleared in #close.
  @unsubscribe_tools      = nil
  @unsubscribe_prompts    = nil
  # Guards concurrent invocations of #close. Rack servers
  # sometimes call close from both the I/O fiber's ensure and a
  # separate disconnect-handler thread; without a mutex the
  # subscriber-deregister and on_close paths can run twice.
  @close_mutex            = Mutex.new
  @closed                 = false
end

Instance Attribute Details

#progress_callbackProc (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Callback exposed to the dispatcher block. Calling this with keyword args progress:, total:, message: pushes a tool-progress notifications/progress event to the SSE queue and marks the worker as "tool is reporting" so subsequent time-based heartbeats are suppressed.

Returns:

  • (Proc)


1196
1197
1198
# File 'lib/parse/agent/mcp_rack_app.rb', line 1196

def progress_callback
  @progress_callback
end

Instance Method Details

#closeObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Terminate the stream and clean up.

When called BEFORE the stream completed normally (the DONE sentinel was not consumed by #each), this is interpreted as a client disconnect and:

  1. The cancellation token (if any) is tripped, so a tool that observes agent.cancelled? at a checkpoint exits cooperatively. The orphaned dispatcher is NOT force-killed (see below); its lifetime is bounded by the per-tool Timeout and the clean MongoDB/REST I/O deadlines.
  2. The abandonment is recorded — a parse.agent.mcp_dispatcher_abandoned notification is emitted for every premature close, and the process-wide Parse::Agent::MCPRackApp.abandoned_dispatcher_count counter is bumped when the dispatcher was still running (a genuine orphan) — so operators can see disconnect-against-slow-tool pressure even though each orphan is individually bounded.

When called AFTER normal completion, neither happens — the request finished on its own; cancellation would only confuse a tool that races to check the flag, and there is nothing to report.

Either path:

  • Kills the WORKER thread (the heartbeat loop) if still alive.
  • Invokes the on_close hook so MCPRackApp can deregister the token from its per-app registry. Failures in the hook are logged and swallowed — close must always succeed.

Why the dispatcher is not force-killed: a Thread#kill (or a foreign Thread#raise) skips the DB driver's rescue-based connection-invalidation, so connection_pool's ensure could return a half-used connection to the pool and corrupt a later request that reuses it. Blocking I/O calls do not observe the cancellation token, but they ARE bounded by the per-tool Timeout.timeout (Tools::TOOL_TIMEOUTS, 5–60s) and the clean MongoDB socket_timeout (10s) / REST timeout (30s) deadlines, which reclaim the connection-pool slot through the driver's clean error path. Cooperative cancellation reduces wasted work; the bounded timeouts cap it; a forcible kill is intentionally avoided.



1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
# File 'lib/parse/agent/mcp_rack_app.rb', line 1334

def close
  # Idempotent — concurrent invocations from the I/O fiber and
  # a disconnect-handler thread short-circuit after the first
  # caller wins the mutex.
  completed_normally = nil
  dispatcher_alive   = false
  @close_mutex.synchronize do
    return if @closed
    @closed = true
    completed_normally = @completed_normally
    dispatcher_alive   = @dispatcher_thread&.alive? || false
  end
  unless completed_normally
    @cancellation_token&.cancel!(reason: :client_disconnect)
    record_abandonment(dispatcher_alive)
  end
  @worker&.kill if @worker&.alive?
  @worker = nil
  # Deregister listChanged subscribers BEFORE the on_close hook
  # so a subsequent registry mutation cannot push events into
  # the queue after the stream has ended.
  begin
    @unsubscribe_tools&.call
    @unsubscribe_prompts&.call
  rescue StandardError => e
    line = "[Parse::Agent::MCPRackApp::SSEBody] unsubscribe error: #{e.class}: #{e.message}"
    if @logger
      @logger.warn(line)
    else
      warn line
    end
  ensure
    @unsubscribe_tools   = nil
    @unsubscribe_prompts = nil
  end
  if @on_close
    begin
      @on_close.call
    rescue StandardError => e
      line = "[Parse::Agent::MCPRackApp::SSEBody] on_close error: #{e.class}: #{e.message}"
      if @logger
        @logger.warn(line)
      else
        warn line
      end
    end
  end
  @on_close = nil
end

#each {|String| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Rack body interface — called once by the Rack server.

Starts a worker thread that runs the dispatcher and emits periodic heartbeats via the queue, then loops reading from the queue and yielding formatted SSE strings until the final response is sent.

Yields:

  • (String)

    SSE-formatted event strings.



1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
# File 'lib/parse/agent/mcp_rack_app.rb', line 1279

def each
  start_worker
  loop do
    msg = @queue.pop
    if msg == DONE
      @close_mutex.synchronize { @completed_normally = true }
      break
    end
    yield msg
  end
ensure
  close
end