Class: Parse::Agent::MCPRackApp::CancellationRegistry

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/agent/mcp_rack_app.rb

Instance Method Summary collapse

Constructor Details

#initializeCancellationRegistry

Returns a new instance of CancellationRegistry.



1890
1891
1892
1893
# File 'lib/parse/agent/mcp_rack_app.rb', line 1890

def initialize
  @entries = {}
  @mutex   = Mutex.new
end

Instance Method Details

#cancel(correlation_id, request_id, reason: :notifications_cancelled) ⇒ Boolean

Trip the matching token. Silent no-op when the entry is missing — by design, to avoid a probe oracle.

Returns:

  • (Boolean)

    true if a matching token was tripped.



1945
1946
1947
1948
1949
1950
1951
# File 'lib/parse/agent/mcp_rack_app.rb', line 1945

def cancel(correlation_id, request_id, reason: :notifications_cancelled)
  return false if correlation_id.nil? || correlation_id.to_s.empty?
  entry = @mutex.synchronize { @entries[[correlation_id, request_id]] }
  return false unless entry
  entry[1].cancel!(reason: reason)
  true
end

#cancel_all_for(correlation_id, reason: :session_terminated) ⇒ Object

Trip every token registered under the given correlation_id. Used by DELETE / session termination — when a client tears down its session, any in-flight requests still running under that correlation_id are cancelled so worker threads exit promptly instead of carrying a doomed result to completion.

Silent no-op when no entries match (or correlation_id is blank). Returns the number of tokens tripped.



1961
1962
1963
1964
1965
1966
1967
1968
1969
# File 'lib/parse/agent/mcp_rack_app.rb', line 1961

def cancel_all_for(correlation_id, reason: :session_terminated)
  return 0 if correlation_id.nil? || correlation_id.to_s.empty?
  tokens = @mutex.synchronize do
    keys = @entries.keys.select { |(cid, _)| cid == correlation_id }
    keys.map { |k| @entries.delete(k)[1] }
  end
  tokens.each { |t| t.cancel!(reason: reason) }
  tokens.size
end

#deregister(correlation_id, request_id, entry_id) ⇒ Boolean

Release a previously-registered entry. Removes the slot only when the current owner matches the passed entry-id, so a stale on_close from a request whose slot was overwritten by a sibling registration cannot evict the sibling's token. Idempotent.

Returns:

  • (Boolean)

    true if this call removed the entry.



1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
# File 'lib/parse/agent/mcp_rack_app.rb', line 1927

def deregister(correlation_id, request_id, entry_id)
  return false if correlation_id.nil? || correlation_id.to_s.empty?
  return false if entry_id.nil?
  @mutex.synchronize do
    current = @entries[[correlation_id, request_id]]
    if current && current[0] == entry_id
      @entries.delete([correlation_id, request_id])
      true
    else
      false
    end
  end
end

#register(correlation_id, request_id, token) ⇒ String?

Register a cancellation token for the given session and request id pair. Returns an opaque entry-id that the caller must pass to #deregister to release the slot. If multiple registrations land on the same key (legitimate id-reuse by the same session, or a request retry), only the latest registration is reachable for #cancel; older entries can still be safely released via their entry-id even though they no longer "own" the slot.

Parameters:

Returns:

  • (String, nil)

    opaque entry-id, or nil when registration was refused (no correlation_id).



1911
1912
1913
1914
1915
1916
1917
1918
# File 'lib/parse/agent/mcp_rack_app.rb', line 1911

def register(correlation_id, request_id, token)
  return nil if correlation_id.nil? || correlation_id.to_s.empty?
  entry_id = SecureRandom.uuid
  @mutex.synchronize do
    @entries[[correlation_id, request_id]] = [entry_id, token]
  end
  entry_id
end

#sizeInteger

Returns number of currently-registered tokens. Used by tests and operator dashboards.

Returns:

  • (Integer)

    number of currently-registered tokens. Used by tests and operator dashboards.



1973
1974
1975
# File 'lib/parse/agent/mcp_rack_app.rb', line 1973

def size
  @mutex.synchronize { @entries.size }
end