Class: Parse::Agent::MCPRackApp::CancellationRegistry
- Inherits:
-
Object
- Object
- Parse::Agent::MCPRackApp::CancellationRegistry
- Defined in:
- lib/parse/agent/mcp_rack_app.rb
Instance Method Summary collapse
-
#cancel(correlation_id, request_id, reason: :notifications_cancelled) ⇒ Boolean
Trip the matching token.
-
#cancel_all_for(correlation_id, reason: :session_terminated) ⇒ Object
Trip every token registered under the given correlation_id.
-
#deregister(correlation_id, request_id, entry_id) ⇒ Boolean
Release a previously-registered entry.
-
#initialize ⇒ CancellationRegistry
constructor
A new instance of CancellationRegistry.
-
#register(correlation_id, request_id, token) ⇒ String?
Register a cancellation token for the given session and request id pair.
-
#size ⇒ Integer
Number of currently-registered tokens.
Constructor Details
#initialize ⇒ CancellationRegistry
Returns a new instance of CancellationRegistry.
1682 1683 1684 1685 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1682 def initialize @entries = {} @mutex = Mutex.new end |
Instance Method Details
#cancel(correlation_id, request_id, reason: :notifications_cancelled) ⇒ Boolean
Trip the matching token. Silent no-op when the entry is missing — by design, to avoid a probe oracle.
1737 1738 1739 1740 1741 1742 1743 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1737 def cancel(correlation_id, request_id, reason: :notifications_cancelled) return false if correlation_id.nil? || correlation_id.to_s.empty? entry = @mutex.synchronize { @entries[[correlation_id, request_id]] } return false unless entry entry[1].cancel!(reason: reason) true end |
#cancel_all_for(correlation_id, reason: :session_terminated) ⇒ Object
Trip every token registered under the given correlation_id.
Used by DELETE / session termination — when a client tears
down its session, any in-flight requests still running under
that correlation_id are cancelled so worker threads exit
promptly instead of carrying a doomed result to completion.
Silent no-op when no entries match (or correlation_id is blank). Returns the number of tokens tripped.
1753 1754 1755 1756 1757 1758 1759 1760 1761 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1753 def cancel_all_for(correlation_id, reason: :session_terminated) return 0 if correlation_id.nil? || correlation_id.to_s.empty? tokens = @mutex.synchronize do keys = @entries.keys.select { |(cid, _)| cid == correlation_id } keys.map { |k| @entries.delete(k)[1] } end tokens.each { |t| t.cancel!(reason: reason) } tokens.size end |
#deregister(correlation_id, request_id, entry_id) ⇒ Boolean
Release a previously-registered entry. Removes the slot only when the current owner matches the passed entry-id, so a stale on_close from a request whose slot was overwritten by a sibling registration cannot evict the sibling's token. Idempotent.
1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1719 def deregister(correlation_id, request_id, entry_id) return false if correlation_id.nil? || correlation_id.to_s.empty? return false if entry_id.nil? @mutex.synchronize do current = @entries[[correlation_id, request_id]] if current && current[0] == entry_id @entries.delete([correlation_id, request_id]) true else false end end end |
#register(correlation_id, request_id, token) ⇒ String?
Register a cancellation token for the given session and request id pair. Returns an opaque entry-id that the caller must pass to #deregister to release the slot. If multiple registrations land on the same key (legitimate id-reuse by the same session, or a request retry), only the latest registration is reachable for #cancel; older entries can still be safely released via their entry-id even though they no longer "own" the slot.
1703 1704 1705 1706 1707 1708 1709 1710 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1703 def register(correlation_id, request_id, token) return nil if correlation_id.nil? || correlation_id.to_s.empty? entry_id = SecureRandom.uuid @mutex.synchronize do @entries[[correlation_id, request_id]] = [entry_id, token] end entry_id end |
#size ⇒ Integer
Returns number of currently-registered tokens. Used by tests and operator dashboards.
1765 1766 1767 |
# File 'lib/parse/agent/mcp_rack_app.rb', line 1765 def size @mutex.synchronize { @entries.size } end |