Class: Parse::Agent::MCPRackApp::SSEBody Private
- Inherits:
-
Object
- Object
- Parse::Agent::MCPRackApp::SSEBody
- Defined in:
- lib/parse/agent/mcp_rack_app.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Rack body object that emits MCP progress notifications over SSE.
‘#each` is the only public interface (besides `#close`). It is driven by the Rack server on whatever thread/fiber handles response writing. The dispatcher call and heartbeat timer both run on a dedicated worker thread so they do not block the calling fiber.
Two sources of progress events
SSEBody emits ‘notifications/progress` events from two sources:
-
**Time-based heartbeats.** The worker thread emits a heartbeat every ‘@interval` seconds while the dispatcher is running. The `progress` field is elapsed seconds; `total` is omitted. The heartbeat uses a dedicated server-generated `progressToken` distinct from any client-supplied token so the elapsed-seconds scale never appears alongside tool-reported work units on the same token (the MCP spec requires per-token monotonicity).
-
**Tool-internal progress.** Tools call ‘agent.report_progress(…)` which invokes the callback exposed by `#progress_callback`. The callback pushes an event using the client-supplied or server-generated `progressToken` with the tool-supplied `progress`, optional `total`, and optional `message`.
Once a tool starts reporting its own progress, the heartbeat loop suppresses further time-based events to reduce stream noise — the tool’s reports already carry liveness signal. When the tool never calls ‘report_progress`, heartbeats continue firing for the lifetime of the dispatcher.
Wire format for each SSE event (note: trailing blank line is required by the SSE spec):
event: progress\n
data: <json>\n
\n
Constant Summary collapse
- DONE =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Sentinel pushed to the queue when the worker is done.
:__sse_done__
Instance Attribute Summary collapse
-
#progress_callback ⇒ Proc
readonly
private
Callback exposed to the dispatcher block.
Instance Method Summary collapse
-
#close ⇒ Object
private
Terminate the stream and clean up.
-
#each {|String| ... } ⇒ Object
private
Rack body interface — called once by the Rack server.
-
#initialize(progress_token, req_id, interval, logger, cancellation_token: nil, on_close: nil, heartbeat_waiter: nil, &dispatcher_blk) ⇒ SSEBody
constructor
private
A new instance of SSEBody.
Constructor Details
#initialize(progress_token, req_id, interval, logger, cancellation_token: nil, on_close: nil, heartbeat_waiter: nil, &dispatcher_blk) ⇒ SSEBody
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of SSEBody.
693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 693 def initialize(progress_token, req_id, interval, logger, cancellation_token: nil, on_close: nil, heartbeat_waiter: nil, &dispatcher_blk) @progress_token = progress_token # Heartbeats use a dedicated server-generated progressToken so # the elapsed-seconds scale of heartbeats never appears on the # same MCP progressToken as work-unit values reported by tools. # The MCP spec requires `progress` to increase monotonically # per progressToken; mixing the two scales would violate it # at the boundary where a tool first reports. @heartbeat_token = "parse-stack:heartbeat:#{SecureRandom.uuid}" @req_id = req_id @interval = interval @logger = logger @dispatcher_blk = dispatcher_blk @cancellation_token = cancellation_token @on_close = on_close @heartbeat_waiter = heartbeat_waiter || Thread.current[:parse_mcp_sse_heartbeat_waiter] || ->(t, i) { t.join(i) } @queue = Queue.new @worker = nil # Flipped to true by #each when the DONE sentinel is consumed. # #close uses this to decide whether to trip the cancellation # token (false = client disconnect) or skip the trip (true = # the request finished on its own). Reads and writes happen # under @close_mutex below. @completed_normally = false # Volatile flag flipped by the progress_callback the first time a # tool reports. Heartbeats now use a separate progressToken so # the flag is no longer a spec-correctness gate, but we keep # it as a small bandwidth optimization — once a tool is # actively reporting, time-based heartbeats are noise. @tool_progress_reported = false @progress_callback = build_progress_callback # Deregistration callbacks for the Tools/Prompts subscribe # bindings. Set when the worker starts (so a request that is # never driven via #each does not register a stale entry) and # cleared in #close. @unsubscribe_tools = nil @unsubscribe_prompts = nil # Guards concurrent invocations of #close. Rack servers # sometimes call close from both the I/O fiber's ensure and a # separate disconnect-handler thread; without a mutex the # subscriber-deregister and on_close paths can run twice. @close_mutex = Mutex.new @closed = false end |
Instance Attribute Details
#progress_callback ⇒ Proc (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Callback exposed to the dispatcher block. Calling this with keyword args ‘progress:`, `total:`, `message:` pushes a tool-progress `notifications/progress` event to the SSE queue and marks the worker as “tool is reporting” so subsequent time-based heartbeats are suppressed.
670 671 672 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 670 def progress_callback @progress_callback end |
Instance Method Details
#close ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Terminate the stream and clean up.
When called BEFORE the stream completed normally (the DONE sentinel was not consumed by #each), this is interpreted as a client disconnect and:
-
The cancellation token (if any) is tripped BEFORE the worker is killed, so tools that observe ‘agent.cancelled?` at a checkpoint can exit cooperatively. The kill becomes the fallback for tools stuck inside a blocking I/O call.
When called AFTER normal completion, the token is NOT tripped — the request finished on its own; cancellation would only confuse a tool that races to check the flag.
Either path:
- Kills the worker thread if still alive.
- Invokes the on_close hook so MCPRackApp can deregister
the token from its per-app registry. Failures in the hook
are logged and swallowed — close must always succeed.
Cancellation note: blocking I/O calls (MongoDB query, Parse REST roundtrip) do not observe the token until they return. The Ruby-level ‘Timeout.timeout` already wrapping each tool is the hard upper bound on wasted work; cancellation reduces it, not eliminates it.
789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 789 def close # Idempotent — concurrent invocations from the I/O fiber and # a disconnect-handler thread short-circuit after the first # caller wins the mutex. completed_normally = nil @close_mutex.synchronize do return if @closed @closed = true completed_normally = @completed_normally end unless completed_normally @cancellation_token&.cancel!(reason: :client_disconnect) end @worker&.kill if @worker&.alive? @worker = nil # Deregister listChanged subscribers BEFORE the on_close hook # so a subsequent registry mutation cannot push events into # the queue after the stream has ended. begin @unsubscribe_tools&.call @unsubscribe_prompts&.call rescue StandardError => e line = "[Parse::Agent::MCPRackApp::SSEBody] unsubscribe error: #{e.class}: #{e.}" if @logger @logger.warn(line) else warn line end ensure @unsubscribe_tools = nil @unsubscribe_prompts = nil end if @on_close begin @on_close.call rescue StandardError => e line = "[Parse::Agent::MCPRackApp::SSEBody] on_close error: #{e.class}: #{e.}" if @logger @logger.warn(line) else warn line end end end @on_close = nil end |
#each {|String| ... } ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Rack body interface — called once by the Rack server.
Starts a worker thread that runs the dispatcher and emits periodic heartbeats via the queue, then loops reading from the queue and yielding formatted SSE strings until the final response is sent.
749 750 751 752 753 754 755 756 757 758 759 760 761 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 749 def each start_worker loop do msg = @queue.pop if msg == DONE @close_mutex.synchronize { @completed_normally = true } break end yield msg end ensure close end |