Class: Parse::Agent::MCPRackApp::SSEBody Private
- Inherits:
-
Object
- Object
- Parse::Agent::MCPRackApp::SSEBody
- Defined in:
- lib/parse/agent/mcp_rack_app.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Rack body object that emits MCP progress notifications over SSE.
#each is the only public interface (besides #close). It is driven
by the Rack server on whatever thread/fiber handles response writing.
The dispatcher call and heartbeat timer both run on a dedicated worker
thread so they do not block the calling fiber.
== Two sources of progress events
SSEBody emits notifications/progress events from two sources:
Time-based heartbeats. The worker thread emits a heartbeat every
@intervalseconds while the dispatcher is running. Theprogressfield is elapsed seconds;totalis omitted. The heartbeat uses a dedicated server-generatedprogressTokendistinct from any client-supplied token so the elapsed-seconds scale never appears alongside tool-reported work units on the same token (the MCP spec requires per-token monotonicity).Tool-internal progress. Tools call
agent.report_progress(...)which invokes the callback exposed by#progress_callback. The callback pushes an event using the client-supplied or server-generatedprogressTokenwith the tool-suppliedprogress, optionaltotal, and optionalmessage.
Once a tool starts reporting its own progress, the heartbeat
loop suppresses further time-based events to reduce stream
noise — the tool's reports already carry liveness signal. When
the tool never calls report_progress, heartbeats continue
firing for the lifetime of the dispatcher.
Wire format for each SSE event (note: trailing blank line is required by the SSE spec):
event: progress\n
data:
Constant Summary collapse
- DONE =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Sentinel pushed to the queue when the worker is done.
:__sse_done__
Instance Attribute Summary collapse
-
#progress_callback ⇒ Proc
readonly
private
Callback exposed to the dispatcher block.
Instance Method Summary collapse
-
#close ⇒ Object
private
Terminate the stream and clean up.
-
#each {|String| ... } ⇒ Object
private
Rack body interface — called once by the Rack server.
-
#initialize(progress_token, req_id, interval, logger, cancellation_token: nil, on_close: nil, heartbeat_waiter: nil, &dispatcher_blk) ⇒ SSEBody
constructor
private
A new instance of SSEBody.
Constructor Details
#initialize(progress_token, req_id, interval, logger, cancellation_token: nil, on_close: nil, heartbeat_waiter: nil, &dispatcher_blk) ⇒ SSEBody
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of SSEBody.
1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1085 def initialize(progress_token, req_id, interval, logger, cancellation_token: nil, on_close: nil, heartbeat_waiter: nil, &dispatcher_blk) @progress_token = progress_token # Heartbeats use a dedicated server-generated progressToken so # the elapsed-seconds scale of heartbeats never appears on the # same MCP progressToken as work-unit values reported by tools. # The MCP spec requires `progress` to increase monotonically # per progressToken; mixing the two scales would violate it # at the boundary where a tool first reports. @heartbeat_token = "parse-stack:heartbeat:#{SecureRandom.uuid}" @req_id = req_id @interval = interval @logger = logger @dispatcher_blk = dispatcher_blk @cancellation_token = cancellation_token @on_close = on_close @heartbeat_waiter = heartbeat_waiter || Thread.current[:parse_mcp_sse_heartbeat_waiter] || ->(t, i) { t.join(i) } @queue = Queue.new @worker = nil # Flipped to true by #each when the DONE sentinel is consumed. # #close uses this to decide whether to trip the cancellation # token (false = client disconnect) or skip the trip (true = # the request finished on its own). Reads and writes happen # under @close_mutex below. @completed_normally = false # Volatile flag flipped by the progress_callback the first time a # tool reports. Heartbeats now use a separate progressToken so # the flag is no longer a spec-correctness gate, but we keep # it as a small bandwidth optimization — once a tool is # actively reporting, time-based heartbeats are noise. @tool_progress_reported = false @progress_callback = build_progress_callback # Deregistration callbacks for the Tools/Prompts subscribe # bindings. Set when the worker starts (so a request that is # never driven via #each does not register a stale entry) and # cleared in #close. @unsubscribe_tools = nil @unsubscribe_prompts = nil # Guards concurrent invocations of #close. Rack servers # sometimes call close from both the I/O fiber's ensure and a # separate disconnect-handler thread; without a mutex the # subscriber-deregister and on_close paths can run twice. @close_mutex = Mutex.new @closed = false end |
Instance Attribute Details
#progress_callback ⇒ Proc (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Callback exposed to the dispatcher block. Calling this with
keyword args progress:, total:, message: pushes a
tool-progress notifications/progress event to the SSE queue
and marks the worker as "tool is reporting" so subsequent
time-based heartbeats are suppressed.
1062 1063 1064 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1062 def progress_callback @progress_callback end |
Instance Method Details
#close ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Terminate the stream and clean up.
When called BEFORE the stream completed normally (the DONE sentinel was not consumed by #each), this is interpreted as a client disconnect and:
- The cancellation token (if any) is tripped BEFORE the
worker is killed, so tools that observe
agent.cancelled?at a checkpoint can exit cooperatively. The kill becomes the fallback for tools stuck inside a blocking I/O call.
When called AFTER normal completion, the token is NOT tripped — the request finished on its own; cancellation would only confuse a tool that races to check the flag.
Either path:
- Kills the worker thread if still alive.
- Invokes the on_close hook so MCPRackApp can deregister the token from its per-app registry. Failures in the hook are logged and swallowed — close must always succeed.
Cancellation note: blocking I/O calls (MongoDB query, Parse
REST roundtrip) do not observe the token until they return.
The Ruby-level Timeout.timeout already wrapping each tool is
the hard upper bound on wasted work; cancellation reduces it,
not eliminates it.
1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1181 def close # Idempotent — concurrent invocations from the I/O fiber and # a disconnect-handler thread short-circuit after the first # caller wins the mutex. completed_normally = nil @close_mutex.synchronize do return if @closed @closed = true completed_normally = @completed_normally end unless completed_normally @cancellation_token&.cancel!(reason: :client_disconnect) end @worker&.kill if @worker&.alive? @worker = nil # Deregister listChanged subscribers BEFORE the on_close hook # so a subsequent registry mutation cannot push events into # the queue after the stream has ended. begin @unsubscribe_tools&.call @unsubscribe_prompts&.call rescue StandardError => e line = "[Parse::Agent::MCPRackApp::SSEBody] unsubscribe error: #{e.class}: #{e.}" if @logger @logger.warn(line) else warn line end ensure @unsubscribe_tools = nil @unsubscribe_prompts = nil end if @on_close begin @on_close.call rescue StandardError => e line = "[Parse::Agent::MCPRackApp::SSEBody] on_close error: #{e.class}: #{e.}" if @logger @logger.warn(line) else warn line end end end @on_close = nil end |
#each {|String| ... } ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Rack body interface — called once by the Rack server.
Starts a worker thread that runs the dispatcher and emits periodic heartbeats via the queue, then loops reading from the queue and yielding formatted SSE strings until the final response is sent.
1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1141 def each start_worker loop do msg = @queue.pop if msg == DONE @close_mutex.synchronize { @completed_normally = true } break end yield msg end ensure close end |