Class: Parse::Agent::MCPRackApp::SSEBody Private
- Inherits:
-
Object
- Object
- Parse::Agent::MCPRackApp::SSEBody
- Defined in:
- lib/parse/agent/mcp_rack_app.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Rack body object that emits MCP progress notifications over SSE.
#each is the only public interface (besides #close). It is driven
by the Rack server on whatever thread/fiber handles response writing.
The dispatcher call and heartbeat timer both run on a dedicated worker
thread so they do not block the calling fiber.
== Two sources of progress events
SSEBody emits notifications/progress events from two sources:
Time-based heartbeats. The worker thread emits a heartbeat every
@intervalseconds while the dispatcher is running. Theprogressfield is elapsed seconds;totalis omitted. The heartbeat uses a dedicated server-generatedprogressTokendistinct from any client-supplied token so the elapsed-seconds scale never appears alongside tool-reported work units on the same token (the MCP spec requires per-token monotonicity).Tool-internal progress. Tools call
agent.report_progress(...)which invokes the callback exposed by#progress_callback. The callback pushes an event using the client-supplied or server-generatedprogressTokenwith the tool-suppliedprogress, optionaltotal, and optionalmessage.
Once a tool starts reporting its own progress, the heartbeat
loop suppresses further time-based events to reduce stream
noise — the tool's reports already carry liveness signal. When
the tool never calls report_progress, heartbeats continue
firing for the lifetime of the dispatcher.
Wire format for each SSE event (note: trailing blank line is required by the SSE spec):
event: progress\n
data:
Constant Summary collapse
- DONE =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Sentinel pushed to the queue when the worker is done.
:__sse_done__
Instance Attribute Summary collapse
-
#progress_callback ⇒ Proc
readonly
private
Callback exposed to the dispatcher block.
Instance Method Summary collapse
-
#close ⇒ Object
private
Terminate the stream and clean up.
-
#each {|String| ... } ⇒ Object
private
Rack body interface — called once by the Rack server.
-
#initialize(progress_token, req_id, interval, logger, cancellation_token: nil, on_close: nil, heartbeat_waiter: nil, &dispatcher_blk) ⇒ SSEBody
constructor
private
A new instance of SSEBody.
Constructor Details
#initialize(progress_token, req_id, interval, logger, cancellation_token: nil, on_close: nil, heartbeat_waiter: nil, &dispatcher_blk) ⇒ SSEBody
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of SSEBody.
1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1219 def initialize(progress_token, req_id, interval, logger, cancellation_token: nil, on_close: nil, heartbeat_waiter: nil, &dispatcher_blk) @progress_token = progress_token # Heartbeats use a dedicated server-generated progressToken so # the elapsed-seconds scale of heartbeats never appears on the # same MCP progressToken as work-unit values reported by tools. # The MCP spec requires `progress` to increase monotonically # per progressToken; mixing the two scales would violate it # at the boundary where a tool first reports. @heartbeat_token = "parse-stack:heartbeat:#{SecureRandom.uuid}" @req_id = req_id @interval = interval @logger = logger @dispatcher_blk = dispatcher_blk @cancellation_token = cancellation_token @on_close = on_close @heartbeat_waiter = heartbeat_waiter || Thread.current[:parse_mcp_sse_heartbeat_waiter] || ->(t, i) { t.join(i) } @queue = Queue.new @worker = nil # The dispatcher thread spawned inside @worker. Published under # @close_mutex once started so {#close} can snapshot its liveness for # the abandonment signal. Never force-killed (see #close). @dispatcher_thread = nil # Flipped to true by #each when the DONE sentinel is consumed. # #close uses this to decide whether to trip the cancellation # token (false = client disconnect) or skip the trip (true = # the request finished on its own). Reads and writes happen # under @close_mutex below. @completed_normally = false # Volatile flag flipped by the progress_callback the first time a # tool reports. Heartbeats now use a separate progressToken so # the flag is no longer a spec-correctness gate, but we keep # it as a small bandwidth optimization — once a tool is # actively reporting, time-based heartbeats are noise. @tool_progress_reported = false @progress_callback = build_progress_callback # Deregistration callbacks for the Tools/Prompts subscribe # bindings. Set when the worker starts (so a request that is # never driven via #each does not register a stale entry) and # cleared in #close. @unsubscribe_tools = nil @unsubscribe_prompts = nil # Guards concurrent invocations of #close. Rack servers # sometimes call close from both the I/O fiber's ensure and a # separate disconnect-handler thread; without a mutex the # subscriber-deregister and on_close paths can run twice. @close_mutex = Mutex.new @closed = false end |
Instance Attribute Details
#progress_callback ⇒ Proc (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Callback exposed to the dispatcher block. Calling this with
keyword args progress:, total:, message: pushes a
tool-progress notifications/progress event to the SSE queue
and marks the worker as "tool is reporting" so subsequent
time-based heartbeats are suppressed.
1196 1197 1198 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1196 def progress_callback @progress_callback end |
Instance Method Details
#close ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Terminate the stream and clean up.
When called BEFORE the stream completed normally (the DONE sentinel was not consumed by #each), this is interpreted as a client disconnect and:
- The cancellation token (if any) is tripped, so a tool that
observes
agent.cancelled?at a checkpoint exits cooperatively. The orphaned dispatcher is NOT force-killed (see below); its lifetime is bounded by the per-tool Timeout and the clean MongoDB/REST I/O deadlines. - The abandonment is recorded — a
parse.agent.mcp_dispatcher_abandonednotification is emitted for every premature close, and the process-wide Parse::Agent::MCPRackApp.abandoned_dispatcher_count counter is bumped when the dispatcher was still running (a genuine orphan) — so operators can see disconnect-against-slow-tool pressure even though each orphan is individually bounded.
When called AFTER normal completion, neither happens — the request finished on its own; cancellation would only confuse a tool that races to check the flag, and there is nothing to report.
Either path:
- Kills the WORKER thread (the heartbeat loop) if still alive.
- Invokes the on_close hook so MCPRackApp can deregister the token from its per-app registry. Failures in the hook are logged and swallowed — close must always succeed.
Why the dispatcher is not force-killed: a Thread#kill (or a
foreign Thread#raise) skips the DB driver's rescue-based
connection-invalidation, so connection_pool's ensure could
return a half-used connection to the pool and corrupt a later
request that reuses it. Blocking I/O calls do not observe the
cancellation token, but they ARE bounded by the per-tool
Timeout.timeout (Tools::TOOL_TIMEOUTS, 5–60s) and the clean
MongoDB socket_timeout (10s) / REST timeout (30s) deadlines,
which reclaim the connection-pool slot through the driver's
clean error path. Cooperative cancellation reduces wasted work;
the bounded timeouts cap it; a forcible kill is intentionally
avoided.
1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1334 def close # Idempotent — concurrent invocations from the I/O fiber and # a disconnect-handler thread short-circuit after the first # caller wins the mutex. completed_normally = nil dispatcher_alive = false @close_mutex.synchronize do return if @closed @closed = true completed_normally = @completed_normally dispatcher_alive = @dispatcher_thread&.alive? || false end unless completed_normally @cancellation_token&.cancel!(reason: :client_disconnect) record_abandonment(dispatcher_alive) end @worker&.kill if @worker&.alive? @worker = nil # Deregister listChanged subscribers BEFORE the on_close hook # so a subsequent registry mutation cannot push events into # the queue after the stream has ended. begin @unsubscribe_tools&.call @unsubscribe_prompts&.call rescue StandardError => e line = "[Parse::Agent::MCPRackApp::SSEBody] unsubscribe error: #{e.class}: #{e.}" if @logger @logger.warn(line) else warn line end ensure @unsubscribe_tools = nil @unsubscribe_prompts = nil end if @on_close begin @on_close.call rescue StandardError => e line = "[Parse::Agent::MCPRackApp::SSEBody] on_close error: #{e.class}: #{e.}" if @logger @logger.warn(line) else warn line end end end @on_close = nil end |
#each {|String| ... } ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Rack body interface — called once by the Rack server.
Starts a worker thread that runs the dispatcher and emits periodic heartbeats via the queue, then loops reading from the queue and yielding formatted SSE strings until the final response is sent.
1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1279 def each start_worker loop do msg = @queue.pop if msg == DONE @close_mutex.synchronize { @completed_normally = true } break end yield msg end ensure close end |