Class: Parse::Agent::MCPRackApp::SSEBody Private

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/agent/mcp_rack_app.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Rack body object that emits MCP progress notifications over SSE.

#each is the only public interface (besides #close). It is driven by the Rack server on whatever thread/fiber handles response writing. The dispatcher call and heartbeat timer both run on a dedicated worker thread so they do not block the calling fiber.

== Two sources of progress events

SSEBody emits notifications/progress events from two sources:

  1. Time-based heartbeats. The worker thread emits a heartbeat every @interval seconds while the dispatcher is running. The progress field is elapsed seconds; total is omitted. The heartbeat uses a dedicated server-generated progressToken distinct from any client-supplied token so the elapsed-seconds scale never appears alongside tool-reported work units on the same token (the MCP spec requires per-token monotonicity).

  2. Tool-internal progress. Tools call agent.report_progress(...) which invokes the callback exposed by #progress_callback. The callback pushes an event using the client-supplied or server-generated progressToken with the tool-supplied progress, optional total, and optional message.

Once a tool starts reporting its own progress, the heartbeat loop suppresses further time-based events to reduce stream noise — the tool's reports already carry liveness signal. When the tool never calls report_progress, heartbeats continue firing for the lifetime of the dispatcher.

Wire format for each SSE event (note: trailing blank line is required by the SSE spec):

event: progress\n data: \n \n

Constant Summary collapse

DONE =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Sentinel pushed to the queue when the worker is done.

:__sse_done__

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(progress_token, req_id, interval, logger, cancellation_token: nil, on_close: nil, heartbeat_waiter: nil, &dispatcher_blk) ⇒ SSEBody

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of SSEBody.

Parameters:

  • progress_token (String)

    MCP progressToken value.

  • req_id (Object)

    JSON-RPC request id (may be nil).

  • interval (Numeric)

    heartbeat period in seconds.

  • logger (#warn, nil)

    optional logger.

  • cancellation_token (Parse::Agent::CancellationToken, nil) (defaults to: nil)

    token tripped by #close (client disconnect) and by notifications/cancelled lookups. Tools cooperate by checking agent.cancelled?.

  • on_close (Proc, nil) (defaults to: nil)

    callback invoked from #close after the worker has been terminated. Used by MCPRackApp to deregister the cancellation token from the per-app registry.

  • dispatcher_blk (Proc)

    called with one argument (the #progress_callback Proc); must return the same { status:, body: } hash that MCPDispatcher.call returns.

  • heartbeat_waiter (Proc, nil) (defaults to: nil)

    test hook. Called as waiter.call(dispatcher_thread, interval) once per heartbeat iteration; must block until either the dispatcher finishes or interval elapses. Default delegates to dispatcher_thread.join(interval). Tests inject a queue-driven waiter so heartbeat cadence is deterministic and not subject to OS scheduler jitter.



1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
# File 'lib/parse/agent/mcp_rack_app.rb', line 1085

def initialize(progress_token, req_id, interval, logger,
               cancellation_token: nil, on_close: nil,
               heartbeat_waiter: nil, &dispatcher_blk)
  @progress_token         = progress_token
  # Heartbeats use a dedicated server-generated progressToken so
  # the elapsed-seconds scale of heartbeats never appears on the
  # same MCP progressToken as work-unit values reported by tools.
  # The MCP spec requires `progress` to increase monotonically
  # per progressToken; mixing the two scales would violate it
  # at the boundary where a tool first reports.
  @heartbeat_token        = "parse-stack:heartbeat:#{SecureRandom.uuid}"
  @req_id                 = req_id
  @interval               = interval
  @logger                 = logger
  @dispatcher_blk         = dispatcher_blk
  @cancellation_token     = cancellation_token
  @on_close               = on_close
  @heartbeat_waiter       = heartbeat_waiter ||
                            Thread.current[:parse_mcp_sse_heartbeat_waiter] ||
                            ->(t, i) { t.join(i) }
  @queue                  = Queue.new
  @worker                 = nil
  # Flipped to true by #each when the DONE sentinel is consumed.
  # #close uses this to decide whether to trip the cancellation
  # token (false = client disconnect) or skip the trip (true =
  # the request finished on its own). Reads and writes happen
  # under @close_mutex below.
  @completed_normally     = false
  # Volatile flag flipped by the progress_callback the first time a
  # tool reports. Heartbeats now use a separate progressToken so
  # the flag is no longer a spec-correctness gate, but we keep
  # it as a small bandwidth optimization — once a tool is
  # actively reporting, time-based heartbeats are noise.
  @tool_progress_reported = false
  @progress_callback      = build_progress_callback
  # Deregistration callbacks for the Tools/Prompts subscribe
  # bindings. Set when the worker starts (so a request that is
  # never driven via #each does not register a stale entry) and
  # cleared in #close.
  @unsubscribe_tools      = nil
  @unsubscribe_prompts    = nil
  # Guards concurrent invocations of #close. Rack servers
  # sometimes call close from both the I/O fiber's ensure and a
  # separate disconnect-handler thread; without a mutex the
  # subscriber-deregister and on_close paths can run twice.
  @close_mutex            = Mutex.new
  @closed                 = false
end

Instance Attribute Details

#progress_callbackProc (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Callback exposed to the dispatcher block. Calling this with keyword args progress:, total:, message: pushes a tool-progress notifications/progress event to the SSE queue and marks the worker as "tool is reporting" so subsequent time-based heartbeats are suppressed.

Returns:

  • (Proc)


1062
1063
1064
# File 'lib/parse/agent/mcp_rack_app.rb', line 1062

def progress_callback
  @progress_callback
end

Instance Method Details

#closeObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Terminate the stream and clean up.

When called BEFORE the stream completed normally (the DONE sentinel was not consumed by #each), this is interpreted as a client disconnect and:

  1. The cancellation token (if any) is tripped BEFORE the worker is killed, so tools that observe agent.cancelled? at a checkpoint can exit cooperatively. The kill becomes the fallback for tools stuck inside a blocking I/O call.

When called AFTER normal completion, the token is NOT tripped — the request finished on its own; cancellation would only confuse a tool that races to check the flag.

Either path:

  • Kills the worker thread if still alive.
  • Invokes the on_close hook so MCPRackApp can deregister the token from its per-app registry. Failures in the hook are logged and swallowed — close must always succeed.

Cancellation note: blocking I/O calls (MongoDB query, Parse REST roundtrip) do not observe the token until they return. The Ruby-level Timeout.timeout already wrapping each tool is the hard upper bound on wasted work; cancellation reduces it, not eliminates it.



1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
# File 'lib/parse/agent/mcp_rack_app.rb', line 1181

def close
  # Idempotent — concurrent invocations from the I/O fiber and
  # a disconnect-handler thread short-circuit after the first
  # caller wins the mutex.
  completed_normally = nil
  @close_mutex.synchronize do
    return if @closed
    @closed = true
    completed_normally = @completed_normally
  end
  unless completed_normally
    @cancellation_token&.cancel!(reason: :client_disconnect)
  end
  @worker&.kill if @worker&.alive?
  @worker = nil
  # Deregister listChanged subscribers BEFORE the on_close hook
  # so a subsequent registry mutation cannot push events into
  # the queue after the stream has ended.
  begin
    @unsubscribe_tools&.call
    @unsubscribe_prompts&.call
  rescue StandardError => e
    line = "[Parse::Agent::MCPRackApp::SSEBody] unsubscribe error: #{e.class}: #{e.message}"
    if @logger
      @logger.warn(line)
    else
      warn line
    end
  ensure
    @unsubscribe_tools   = nil
    @unsubscribe_prompts = nil
  end
  if @on_close
    begin
      @on_close.call
    rescue StandardError => e
      line = "[Parse::Agent::MCPRackApp::SSEBody] on_close error: #{e.class}: #{e.message}"
      if @logger
        @logger.warn(line)
      else
        warn line
      end
    end
  end
  @on_close = nil
end

#each {|String| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Rack body interface — called once by the Rack server.

Starts a worker thread that runs the dispatcher and emits periodic heartbeats via the queue, then loops reading from the queue and yielding formatted SSE strings until the final response is sent.

Yields:

  • (String)

    SSE-formatted event strings.



1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
# File 'lib/parse/agent/mcp_rack_app.rb', line 1141

def each
  start_worker
  loop do
    msg = @queue.pop
    if msg == DONE
      @close_mutex.synchronize { @completed_normally = true }
      break
    end
    yield msg
  end
ensure
  close
end