Class: Parse::Agent::MCPRackApp::CancellationRegistry
- Inherits:
-
Object
- Object
- Parse::Agent::MCPRackApp::CancellationRegistry
- Defined in:
- lib/parse/agent/mcp_rack_app.rb
Instance Method Summary collapse
-
#cancel(correlation_id, request_id, reason: :notifications_cancelled) ⇒ Boolean
Trip the matching token.
-
#cancel_all_for(correlation_id, reason: :session_terminated) ⇒ Object
Trip every token registered under the given correlation_id.
-
#deregister(correlation_id, request_id, entry_id) ⇒ Boolean
Release a previously-registered entry.
-
#initialize ⇒ CancellationRegistry
constructor
A new instance of CancellationRegistry.
-
#register(correlation_id, request_id, token) ⇒ String?
Register a cancellation token for the given session and request id pair.
-
#size ⇒ Integer
Number of currently-registered tokens.
Constructor Details
#initialize ⇒ CancellationRegistry
Returns a new instance of CancellationRegistry.
1890 1891 1892 1893 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1890 def initialize @entries = {} @mutex = Mutex.new end |
Instance Method Details
#cancel(correlation_id, request_id, reason: :notifications_cancelled) ⇒ Boolean
Trip the matching token. Silent no-op when the entry is missing — by design, to avoid a probe oracle.
1945 1946 1947 1948 1949 1950 1951 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1945 def cancel(correlation_id, request_id, reason: :notifications_cancelled) return false if correlation_id.nil? || correlation_id.to_s.empty? entry = @mutex.synchronize { @entries[[correlation_id, request_id]] } return false unless entry entry[1].cancel!(reason: reason) true end |
#cancel_all_for(correlation_id, reason: :session_terminated) ⇒ Object
Trip every token registered under the given correlation_id.
Used by DELETE / session termination — when a client tears
down its session, any in-flight requests still running under
that correlation_id are cancelled so worker threads exit
promptly instead of carrying a doomed result to completion.
Silent no-op when no entries match (or correlation_id is blank). Returns the number of tokens tripped.
1961 1962 1963 1964 1965 1966 1967 1968 1969 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1961 def cancel_all_for(correlation_id, reason: :session_terminated) return 0 if correlation_id.nil? || correlation_id.to_s.empty? tokens = @mutex.synchronize do keys = @entries.keys.select { |(cid, _)| cid == correlation_id } keys.map { |k| @entries.delete(k)[1] } end tokens.each { |t| t.cancel!(reason: reason) } tokens.size end |
#deregister(correlation_id, request_id, entry_id) ⇒ Boolean
Release a previously-registered entry. Removes the slot only when the current owner matches the passed entry-id, so a stale on_close from a request whose slot was overwritten by a sibling registration cannot evict the sibling's token. Idempotent.
1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1927 def deregister(correlation_id, request_id, entry_id) return false if correlation_id.nil? || correlation_id.to_s.empty? return false if entry_id.nil? @mutex.synchronize do current = @entries[[correlation_id, request_id]] if current && current[0] == entry_id @entries.delete([correlation_id, request_id]) true else false end end end |
#register(correlation_id, request_id, token) ⇒ String?
Register a cancellation token for the given session and request id pair. Returns an opaque entry-id that the caller must pass to #deregister to release the slot. If multiple registrations land on the same key (legitimate id-reuse by the same session, or a request retry), only the latest registration is reachable for #cancel; older entries can still be safely released via their entry-id even though they no longer "own" the slot.
1911 1912 1913 1914 1915 1916 1917 1918 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1911 def register(correlation_id, request_id, token) return nil if correlation_id.nil? || correlation_id.to_s.empty? entry_id = SecureRandom.uuid @mutex.synchronize do @entries[[correlation_id, request_id]] = [entry_id, token] end entry_id end |
#size ⇒ Integer
Returns number of currently-registered tokens. Used by tests and operator dashboards.
1973 1974 1975 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1973 def size @mutex.synchronize { @entries.size } end |