Class: Parse::Agent::MCPRackApp::SSEBody Private

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/agent/mcp_rack_app.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Rack body object that emits MCP progress notifications over SSE.

‘#each` is the only public interface (besides `#close`). It is driven by the Rack server on whatever thread/fiber handles response writing. The dispatcher call and heartbeat timer both run on a dedicated worker thread so they do not block the calling fiber.

Two sources of progress events

SSEBody emits ‘notifications/progress` events from two sources:

  1. **Time-based heartbeats.** The worker thread emits a heartbeat every ‘@interval` seconds while the dispatcher is running. The `progress` field is elapsed seconds; `total` is omitted. The heartbeat uses a dedicated server-generated `progressToken` distinct from any client-supplied token so the elapsed-seconds scale never appears alongside tool-reported work units on the same token (the MCP spec requires per-token monotonicity).

  2. **Tool-internal progress.** Tools call ‘agent.report_progress(…)` which invokes the callback exposed by `#progress_callback`. The callback pushes an event using the client-supplied or server-generated `progressToken` with the tool-supplied `progress`, optional `total`, and optional `message`.

Once a tool starts reporting its own progress, the heartbeat loop suppresses further time-based events to reduce stream noise — the tool’s reports already carry liveness signal. When the tool never calls ‘report_progress`, heartbeats continue firing for the lifetime of the dispatcher.

Wire format for each SSE event (note: trailing blank line is required by the SSE spec):

event: progress\n
data: <json>\n
\n

Constant Summary collapse

DONE =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Sentinel pushed to the queue when the worker is done.

:__sse_done__

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(progress_token, req_id, interval, logger, cancellation_token: nil, on_close: nil, &dispatcher_blk) ⇒ SSEBody

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of SSEBody.

Parameters:

  • progress_token (String)

    MCP progressToken value.

  • req_id (Object)

    JSON-RPC request id (may be nil).

  • interval (Numeric)

    heartbeat period in seconds.

  • logger (#warn, nil)

    optional logger.

  • cancellation_token (Parse::Agent::CancellationToken, nil) (defaults to: nil)

    token tripped by #close (client disconnect) and by ‘notifications/cancelled` lookups. Tools cooperate by checking `agent.cancelled?`.

  • on_close (Proc, nil) (defaults to: nil)

    callback invoked from #close after the worker has been terminated. Used by MCPRackApp to deregister the cancellation token from the per-app registry.

  • dispatcher_blk (Proc)

    called with one argument (the #progress_callback Proc); must return the same ‘{ status:, body: }` hash that MCPDispatcher.call returns.



580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
# File 'lib/parse/agent/mcp_rack_app.rb', line 580

def initialize(progress_token, req_id, interval, logger,
               cancellation_token: nil, on_close: nil, &dispatcher_blk)
  @progress_token         = progress_token
  # Heartbeats use a dedicated server-generated progressToken so
  # the elapsed-seconds scale of heartbeats never appears on the
  # same MCP progressToken as work-unit values reported by tools.
  # The MCP spec requires `progress` to increase monotonically
  # per progressToken; mixing the two scales would violate it
  # at the boundary where a tool first reports.
  @heartbeat_token        = "parse-stack:heartbeat:#{SecureRandom.uuid}"
  @req_id                 = req_id
  @interval               = interval
  @logger                 = logger
  @dispatcher_blk         = dispatcher_blk
  @cancellation_token     = cancellation_token
  @on_close               = on_close
  @queue                  = Queue.new
  @worker                 = nil
  # Flipped to true by #each when the DONE sentinel is consumed.
  # #close uses this to decide whether to trip the cancellation
  # token (false = client disconnect) or skip the trip (true =
  # the request finished on its own). Reads and writes happen
  # under @close_mutex below.
  @completed_normally     = false
  # Volatile flag flipped by the progress_callback the first time a
  # tool reports. Heartbeats now use a separate progressToken so
  # the flag is no longer a spec-correctness gate, but we keep
  # it as a small bandwidth optimization — once a tool is
  # actively reporting, time-based heartbeats are noise.
  @tool_progress_reported = false
  @progress_callback      = build_progress_callback
  # Deregistration callbacks for the Tools/Prompts subscribe
  # bindings. Set when the worker starts (so a request that is
  # never driven via #each does not register a stale entry) and
  # cleared in #close.
  @unsubscribe_tools      = nil
  @unsubscribe_prompts    = nil
  # Guards concurrent invocations of #close. Rack servers
  # sometimes call close from both the I/O fiber's ensure and a
  # separate disconnect-handler thread; without a mutex the
  # subscriber-deregister and on_close paths can run twice.
  @close_mutex            = Mutex.new
  @closed                 = false
end

Instance Attribute Details

#progress_callbackProc (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Callback exposed to the dispatcher block. Calling this with keyword args ‘progress:`, `total:`, `message:` pushes a tool-progress `notifications/progress` event to the SSE queue and marks the worker as “tool is reporting” so subsequent time-based heartbeats are suppressed.

Returns:

  • (Proc)


564
565
566
# File 'lib/parse/agent/mcp_rack_app.rb', line 564

def progress_callback
  @progress_callback
end

Instance Method Details

#closeObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Terminate the stream and clean up.

When called BEFORE the stream completed normally (the DONE sentinel was not consumed by #each), this is interpreted as a client disconnect and:

  1. The cancellation token (if any) is tripped BEFORE the worker is killed, so tools that observe ‘agent.cancelled?` at a checkpoint can exit cooperatively. The kill becomes the fallback for tools stuck inside a blocking I/O call.

When called AFTER normal completion, the token is NOT tripped — the request finished on its own; cancellation would only confuse a tool that races to check the flag.

Either path:

- Kills the worker thread if still alive.
- Invokes the on_close hook so MCPRackApp can deregister
  the token from its per-app registry. Failures in the hook
  are logged and swallowed — close must always succeed.

Cancellation note: blocking I/O calls (MongoDB query, Parse REST roundtrip) do not observe the token until they return. The Ruby-level ‘Timeout.timeout` already wrapping each tool is the hard upper bound on wasted work; cancellation reduces it, not eliminates it.



672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
# File 'lib/parse/agent/mcp_rack_app.rb', line 672

def close
  # Idempotent — concurrent invocations from the I/O fiber and
  # a disconnect-handler thread short-circuit after the first
  # caller wins the mutex.
  completed_normally = nil
  @close_mutex.synchronize do
    return if @closed
    @closed = true
    completed_normally = @completed_normally
  end
  unless completed_normally
    @cancellation_token&.cancel!(reason: :client_disconnect)
  end
  @worker&.kill if @worker&.alive?
  @worker = nil
  # Deregister listChanged subscribers BEFORE the on_close hook
  # so a subsequent registry mutation cannot push events into
  # the queue after the stream has ended.
  begin
    @unsubscribe_tools&.call
    @unsubscribe_prompts&.call
  rescue StandardError => e
    line = "[Parse::Agent::MCPRackApp::SSEBody] unsubscribe error: #{e.class}: #{e.message}"
    if @logger
      @logger.warn(line)
    else
      warn line
    end
  ensure
    @unsubscribe_tools   = nil
    @unsubscribe_prompts = nil
  end
  if @on_close
    begin
      @on_close.call
    rescue StandardError => e
      line = "[Parse::Agent::MCPRackApp::SSEBody] on_close error: #{e.class}: #{e.message}"
      if @logger
        @logger.warn(line)
      else
        warn line
      end
    end
  end
  @on_close = nil
end

#each {|String| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Rack body interface — called once by the Rack server.

Starts a worker thread that runs the dispatcher and emits periodic heartbeats via the queue, then loops reading from the queue and yielding formatted SSE strings until the final response is sent.

Yields:

  • (String)

    SSE-formatted event strings.



632
633
634
635
636
637
638
639
640
641
642
643
644
# File 'lib/parse/agent/mcp_rack_app.rb', line 632

def each
  start_worker
  loop do
    msg = @queue.pop
    if msg == DONE
      @close_mutex.synchronize { @completed_normally = true }
      break
    end
    yield msg
  end
ensure
  close
end