Class: Parse::Agent::MCPRackApp::CancellationRegistry Private
- Inherits:
-
Object
- Object
- Parse::Agent::MCPRackApp::CancellationRegistry
- Defined in:
- lib/parse/agent/mcp_rack_app.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Per-app store of in-flight cancellable requests. Lookups for cancellation are keyed by ‘[correlation_id, request_id]`, but every #register returns an opaque entry-id token that uniquely identifies the registration. #deregister requires that entry-id and removes the matching token only when it still owns the slot — so a second registration under the same `(correlation_id, request_id)` key cannot cause the first registration’s ‘on_close` to evict the wrong token.
SSEBody registers an entry before spawning its dispatcher_thread and deregisters via the MCPRackApp-supplied on_close hook. A ‘notifications/cancelled` POST calls #cancel to trip the matching CancellationToken.
Identity binding: cancellation requires the cancelling request’s ‘Mcp-Session-Id` (sanitized into `agent.correlation_id`) to match the original request’s. This prevents an attacker who guesses sequential JSON-RPC request ids from cancelling other clients’ in-flight requests. A registration with a nil correlation_id is dropped silently (cancellation is disabled for the request).
Scope: per MCPRackApp instance. Cancellation does NOT span multiple mount points within a process, nor multiple processes in a clustered deployment.
Instance Method Summary collapse
-
#cancel(correlation_id, request_id, reason: :notifications_cancelled) ⇒ Boolean
private
Trip the matching token.
-
#cancel_all_for(correlation_id, reason: :session_terminated) ⇒ Object
private
Trip every token registered under the given correlation_id.
-
#deregister(correlation_id, request_id, entry_id) ⇒ Boolean
private
Release a previously-registered entry.
-
#initialize ⇒ CancellationRegistry
constructor
private
A new instance of CancellationRegistry.
-
#register(correlation_id, request_id, token) ⇒ String?
private
Register a cancellation token for the given session and request id pair.
-
#size ⇒ Integer
private
Number of currently-registered tokens.
Constructor Details
#initialize ⇒ CancellationRegistry
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of CancellationRegistry.
1078 1079 1080 1081 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1078 def initialize @entries = {} @mutex = Mutex.new end |
Instance Method Details
#cancel(correlation_id, request_id, reason: :notifications_cancelled) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Trip the matching token. Silent no-op when the entry is missing — by design, to avoid a probe oracle.
1133 1134 1135 1136 1137 1138 1139 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1133 def cancel(correlation_id, request_id, reason: :notifications_cancelled) return false if correlation_id.nil? || correlation_id.to_s.empty? entry = @mutex.synchronize { @entries[[correlation_id, request_id]] } return false unless entry entry[1].cancel!(reason: reason) true end |
#cancel_all_for(correlation_id, reason: :session_terminated) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Trip every token registered under the given correlation_id. Used by ‘DELETE /` session termination — when a client tears down its session, any in-flight requests still running under that correlation_id are cancelled so worker threads exit promptly instead of carrying a doomed result to completion.
Silent no-op when no entries match (or correlation_id is blank). Returns the number of tokens tripped.
1149 1150 1151 1152 1153 1154 1155 1156 1157 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1149 def cancel_all_for(correlation_id, reason: :session_terminated) return 0 if correlation_id.nil? || correlation_id.to_s.empty? tokens = @mutex.synchronize do keys = @entries.keys.select { |(cid, _)| cid == correlation_id } keys.map { |k| @entries.delete(k)[1] } end tokens.each { |t| t.cancel!(reason: reason) } tokens.size end |
#deregister(correlation_id, request_id, entry_id) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Release a previously-registered entry. Removes the slot only when the current owner matches the passed entry-id, so a stale on_close from a request whose slot was overwritten by a sibling registration cannot evict the sibling’s token. Idempotent.
1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1115 def deregister(correlation_id, request_id, entry_id) return false if correlation_id.nil? || correlation_id.to_s.empty? return false if entry_id.nil? @mutex.synchronize do current = @entries[[correlation_id, request_id]] if current && current[0] == entry_id @entries.delete([correlation_id, request_id]) true else false end end end |
#register(correlation_id, request_id, token) ⇒ String?
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Register a cancellation token for the given session and request id pair. Returns an opaque entry-id that the caller must pass to #deregister to release the slot. If multiple registrations land on the same key (legitimate id-reuse by the same session, or a request retry), only the latest registration is reachable for #cancel; older entries can still be safely released via their entry-id even though they no longer “own” the slot.
1099 1100 1101 1102 1103 1104 1105 1106 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1099 def register(correlation_id, request_id, token) return nil if correlation_id.nil? || correlation_id.to_s.empty? entry_id = SecureRandom.uuid @mutex.synchronize do @entries[[correlation_id, request_id]] = [entry_id, token] end entry_id end |
#size ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns number of currently-registered tokens. Used by tests and operator dashboards.
1161 1162 1163 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1161 def size @mutex.synchronize { @entries.size } end |