Class: Parse::Agent::MCPRackApp::CancellationRegistry Private

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/agent/mcp_rack_app.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Per-app store of in-flight cancellable requests. Lookups for cancellation are keyed by ‘[correlation_id, request_id]`, but every #register returns an opaque entry-id token that uniquely identifies the registration. #deregister requires that entry-id and removes the matching token only when it still owns the slot — so a second registration under the same `(correlation_id, request_id)` key cannot cause the first registration’s ‘on_close` to evict the wrong token.

SSEBody registers an entry before spawning its dispatcher_thread and deregisters via the MCPRackApp-supplied on_close hook. A ‘notifications/cancelled` POST calls #cancel to trip the matching CancellationToken.

Identity binding: cancellation requires the cancelling request’s ‘Mcp-Session-Id` (sanitized into `agent.correlation_id`) to match the original request’s. This prevents an attacker who guesses sequential JSON-RPC request ids from cancelling other clients’ in-flight requests. A registration with a nil correlation_id is dropped silently (cancellation is disabled for the request).

Scope: per MCPRackApp instance. Cancellation does NOT span multiple mount points within a process, nor multiple processes in a clustered deployment.

Instance Method Summary collapse

Constructor Details

#initializeCancellationRegistry

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of CancellationRegistry.



1078
1079
1080
1081
# File 'lib/parse/agent/mcp_rack_app.rb', line 1078

def initialize
  @entries = {}
  @mutex   = Mutex.new
end

Instance Method Details

#cancel(correlation_id, request_id, reason: :notifications_cancelled) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Trip the matching token. Silent no-op when the entry is missing — by design, to avoid a probe oracle.

Returns:

  • (Boolean)

    true if a matching token was tripped.



1133
1134
1135
1136
1137
1138
1139
# File 'lib/parse/agent/mcp_rack_app.rb', line 1133

def cancel(correlation_id, request_id, reason: :notifications_cancelled)
  return false if correlation_id.nil? || correlation_id.to_s.empty?
  entry = @mutex.synchronize { @entries[[correlation_id, request_id]] }
  return false unless entry
  entry[1].cancel!(reason: reason)
  true
end

#cancel_all_for(correlation_id, reason: :session_terminated) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Trip every token registered under the given correlation_id. Used by ‘DELETE /` session termination — when a client tears down its session, any in-flight requests still running under that correlation_id are cancelled so worker threads exit promptly instead of carrying a doomed result to completion.

Silent no-op when no entries match (or correlation_id is blank). Returns the number of tokens tripped.



1149
1150
1151
1152
1153
1154
1155
1156
1157
# File 'lib/parse/agent/mcp_rack_app.rb', line 1149

def cancel_all_for(correlation_id, reason: :session_terminated)
  return 0 if correlation_id.nil? || correlation_id.to_s.empty?
  tokens = @mutex.synchronize do
    keys = @entries.keys.select { |(cid, _)| cid == correlation_id }
    keys.map { |k| @entries.delete(k)[1] }
  end
  tokens.each { |t| t.cancel!(reason: reason) }
  tokens.size
end

#deregister(correlation_id, request_id, entry_id) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Release a previously-registered entry. Removes the slot only when the current owner matches the passed entry-id, so a stale on_close from a request whose slot was overwritten by a sibling registration cannot evict the sibling’s token. Idempotent.

Returns:

  • (Boolean)

    true if this call removed the entry.



1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
# File 'lib/parse/agent/mcp_rack_app.rb', line 1115

def deregister(correlation_id, request_id, entry_id)
  return false if correlation_id.nil? || correlation_id.to_s.empty?
  return false if entry_id.nil?
  @mutex.synchronize do
    current = @entries[[correlation_id, request_id]]
    if current && current[0] == entry_id
      @entries.delete([correlation_id, request_id])
      true
    else
      false
    end
  end
end

#register(correlation_id, request_id, token) ⇒ String?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Register a cancellation token for the given session and request id pair. Returns an opaque entry-id that the caller must pass to #deregister to release the slot. If multiple registrations land on the same key (legitimate id-reuse by the same session, or a request retry), only the latest registration is reachable for #cancel; older entries can still be safely released via their entry-id even though they no longer “own” the slot.

Parameters:

Returns:

  • (String, nil)

    opaque entry-id, or nil when registration was refused (no correlation_id).



1099
1100
1101
1102
1103
1104
1105
1106
# File 'lib/parse/agent/mcp_rack_app.rb', line 1099

def register(correlation_id, request_id, token)
  return nil if correlation_id.nil? || correlation_id.to_s.empty?
  entry_id = SecureRandom.uuid
  @mutex.synchronize do
    @entries[[correlation_id, request_id]] = [entry_id, token]
  end
  entry_id
end

#sizeInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns number of currently-registered tokens. Used by tests and operator dashboards.

Returns:

  • (Integer)

    number of currently-registered tokens. Used by tests and operator dashboards.



1161
1162
1163
# File 'lib/parse/agent/mcp_rack_app.rb', line 1161

def size
  @mutex.synchronize { @entries.size }
end