Class: Parse::Agent::MCPRackApp::SSEBody Private
- Inherits:
-
Object
- Object
- Parse::Agent::MCPRackApp::SSEBody
- Defined in:
- lib/parse/agent/mcp_rack_app.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Rack body object that emits MCP progress notifications over SSE.
‘#each` is the only public interface (besides `#close`). It is driven by the Rack server on whatever thread/fiber handles response writing. The dispatcher call and heartbeat timer both run on a dedicated worker thread so they do not block the calling fiber.
Two sources of progress events
SSEBody emits ‘notifications/progress` events from two sources:
-
**Time-based heartbeats.** The worker thread emits a heartbeat every ‘@interval` seconds while the dispatcher is running. The `progress` field is elapsed seconds; `total` is omitted. The heartbeat uses a dedicated server-generated `progressToken` distinct from any client-supplied token so the elapsed-seconds scale never appears alongside tool-reported work units on the same token (the MCP spec requires per-token monotonicity).
-
**Tool-internal progress.** Tools call ‘agent.report_progress(…)` which invokes the callback exposed by `#progress_callback`. The callback pushes an event using the client-supplied or server-generated `progressToken` with the tool-supplied `progress`, optional `total`, and optional `message`.
Once a tool starts reporting its own progress, the heartbeat loop suppresses further time-based events to reduce stream noise — the tool’s reports already carry liveness signal. When the tool never calls ‘report_progress`, heartbeats continue firing for the lifetime of the dispatcher.
Wire format for each SSE event (note: trailing blank line is required by the SSE spec):
event: progress\n
data: <json>\n
\n
Constant Summary collapse
- DONE =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Sentinel pushed to the queue when the worker is done.
:__sse_done__
Instance Attribute Summary collapse
-
#progress_callback ⇒ Proc
readonly
private
Callback exposed to the dispatcher block.
Instance Method Summary collapse
-
#close ⇒ Object
private
Terminate the stream and clean up.
-
#each {|String| ... } ⇒ Object
private
Rack body interface — called once by the Rack server.
-
#initialize(progress_token, req_id, interval, logger, cancellation_token: nil, on_close: nil, &dispatcher_blk) ⇒ SSEBody
constructor
private
A new instance of SSEBody.
Constructor Details
#initialize(progress_token, req_id, interval, logger, cancellation_token: nil, on_close: nil, &dispatcher_blk) ⇒ SSEBody
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of SSEBody.
580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 580 def initialize(progress_token, req_id, interval, logger, cancellation_token: nil, on_close: nil, &dispatcher_blk) @progress_token = progress_token # Heartbeats use a dedicated server-generated progressToken so # the elapsed-seconds scale of heartbeats never appears on the # same MCP progressToken as work-unit values reported by tools. # The MCP spec requires `progress` to increase monotonically # per progressToken; mixing the two scales would violate it # at the boundary where a tool first reports. @heartbeat_token = "parse-stack:heartbeat:#{SecureRandom.uuid}" @req_id = req_id @interval = interval @logger = logger @dispatcher_blk = dispatcher_blk @cancellation_token = cancellation_token @on_close = on_close @queue = Queue.new @worker = nil # Flipped to true by #each when the DONE sentinel is consumed. # #close uses this to decide whether to trip the cancellation # token (false = client disconnect) or skip the trip (true = # the request finished on its own). Reads and writes happen # under @close_mutex below. @completed_normally = false # Volatile flag flipped by the progress_callback the first time a # tool reports. Heartbeats now use a separate progressToken so # the flag is no longer a spec-correctness gate, but we keep # it as a small bandwidth optimization — once a tool is # actively reporting, time-based heartbeats are noise. @tool_progress_reported = false @progress_callback = build_progress_callback # Deregistration callbacks for the Tools/Prompts subscribe # bindings. Set when the worker starts (so a request that is # never driven via #each does not register a stale entry) and # cleared in #close. @unsubscribe_tools = nil @unsubscribe_prompts = nil # Guards concurrent invocations of #close. Rack servers # sometimes call close from both the I/O fiber's ensure and a # separate disconnect-handler thread; without a mutex the # subscriber-deregister and on_close paths can run twice. @close_mutex = Mutex.new @closed = false end |
Instance Attribute Details
#progress_callback ⇒ Proc (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Callback exposed to the dispatcher block. Calling this with keyword args ‘progress:`, `total:`, `message:` pushes a tool-progress `notifications/progress` event to the SSE queue and marks the worker as “tool is reporting” so subsequent time-based heartbeats are suppressed.
564 565 566 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 564 def progress_callback @progress_callback end |
Instance Method Details
#close ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Terminate the stream and clean up.
When called BEFORE the stream completed normally (the DONE sentinel was not consumed by #each), this is interpreted as a client disconnect and:
-
The cancellation token (if any) is tripped BEFORE the worker is killed, so tools that observe ‘agent.cancelled?` at a checkpoint can exit cooperatively. The kill becomes the fallback for tools stuck inside a blocking I/O call.
When called AFTER normal completion, the token is NOT tripped — the request finished on its own; cancellation would only confuse a tool that races to check the flag.
Either path:
- Kills the worker thread if still alive.
- Invokes the on_close hook so MCPRackApp can deregister
the token from its per-app registry. Failures in the hook
are logged and swallowed — close must always succeed.
Cancellation note: blocking I/O calls (MongoDB query, Parse REST roundtrip) do not observe the token until they return. The Ruby-level ‘Timeout.timeout` already wrapping each tool is the hard upper bound on wasted work; cancellation reduces it, not eliminates it.
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 672 def close # Idempotent — concurrent invocations from the I/O fiber and # a disconnect-handler thread short-circuit after the first # caller wins the mutex. completed_normally = nil @close_mutex.synchronize do return if @closed @closed = true completed_normally = @completed_normally end unless completed_normally @cancellation_token&.cancel!(reason: :client_disconnect) end @worker&.kill if @worker&.alive? @worker = nil # Deregister listChanged subscribers BEFORE the on_close hook # so a subsequent registry mutation cannot push events into # the queue after the stream has ended. begin @unsubscribe_tools&.call @unsubscribe_prompts&.call rescue StandardError => e line = "[Parse::Agent::MCPRackApp::SSEBody] unsubscribe error: #{e.class}: #{e.}" if @logger @logger.warn(line) else warn line end ensure @unsubscribe_tools = nil @unsubscribe_prompts = nil end if @on_close begin @on_close.call rescue StandardError => e line = "[Parse::Agent::MCPRackApp::SSEBody] on_close error: #{e.class}: #{e.}" if @logger @logger.warn(line) else warn line end end end @on_close = nil end |
#each {|String| ... } ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Rack body interface — called once by the Rack server.
Starts a worker thread that runs the dispatcher and emits periodic heartbeats via the queue, then loops reading from the queue and yielding formatted SSE strings until the final response is sent.
632 633 634 635 636 637 638 639 640 641 642 643 644 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 632 def each start_worker loop do msg = @queue.pop if msg == DONE @close_mutex.synchronize { @completed_normally = true } break end yield msg end ensure close end |