Class: Parse::Agent::MCPRackApp::CancellationRegistry

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/agent/mcp_rack_app.rb

Instance Method Summary collapse

Constructor Details

#initializeCancellationRegistry

Returns a new instance of CancellationRegistry.



1682
1683
1684
1685
# File 'lib/parse/agent/mcp_rack_app.rb', line 1682

def initialize
  @entries = {}
  @mutex   = Mutex.new
end

Instance Method Details

#cancel(correlation_id, request_id, reason: :notifications_cancelled) ⇒ Boolean

Trip the matching token. Silent no-op when the entry is missing — by design, to avoid a probe oracle.

Returns:

  • (Boolean)

    true if a matching token was tripped.



1737
1738
1739
1740
1741
1742
1743
# File 'lib/parse/agent/mcp_rack_app.rb', line 1737

def cancel(correlation_id, request_id, reason: :notifications_cancelled)
  return false if correlation_id.nil? || correlation_id.to_s.empty?
  entry = @mutex.synchronize { @entries[[correlation_id, request_id]] }
  return false unless entry
  entry[1].cancel!(reason: reason)
  true
end

#cancel_all_for(correlation_id, reason: :session_terminated) ⇒ Object

Trip every token registered under the given correlation_id. Used by DELETE / session termination — when a client tears down its session, any in-flight requests still running under that correlation_id are cancelled so worker threads exit promptly instead of carrying a doomed result to completion.

Silent no-op when no entries match (or correlation_id is blank). Returns the number of tokens tripped.



1753
1754
1755
1756
1757
1758
1759
1760
1761
# File 'lib/parse/agent/mcp_rack_app.rb', line 1753

def cancel_all_for(correlation_id, reason: :session_terminated)
  return 0 if correlation_id.nil? || correlation_id.to_s.empty?
  tokens = @mutex.synchronize do
    keys = @entries.keys.select { |(cid, _)| cid == correlation_id }
    keys.map { |k| @entries.delete(k)[1] }
  end
  tokens.each { |t| t.cancel!(reason: reason) }
  tokens.size
end

#deregister(correlation_id, request_id, entry_id) ⇒ Boolean

Release a previously-registered entry. Removes the slot only when the current owner matches the passed entry-id, so a stale on_close from a request whose slot was overwritten by a sibling registration cannot evict the sibling's token. Idempotent.

Returns:

  • (Boolean)

    true if this call removed the entry.



1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
# File 'lib/parse/agent/mcp_rack_app.rb', line 1719

def deregister(correlation_id, request_id, entry_id)
  return false if correlation_id.nil? || correlation_id.to_s.empty?
  return false if entry_id.nil?
  @mutex.synchronize do
    current = @entries[[correlation_id, request_id]]
    if current && current[0] == entry_id
      @entries.delete([correlation_id, request_id])
      true
    else
      false
    end
  end
end

#register(correlation_id, request_id, token) ⇒ String?

Register a cancellation token for the given session and request id pair. Returns an opaque entry-id that the caller must pass to #deregister to release the slot. If multiple registrations land on the same key (legitimate id-reuse by the same session, or a request retry), only the latest registration is reachable for #cancel; older entries can still be safely released via their entry-id even though they no longer "own" the slot.

Parameters:

Returns:

  • (String, nil)

    opaque entry-id, or nil when registration was refused (no correlation_id).



1703
1704
1705
1706
1707
1708
1709
1710
# File 'lib/parse/agent/mcp_rack_app.rb', line 1703

def register(correlation_id, request_id, token)
  return nil if correlation_id.nil? || correlation_id.to_s.empty?
  entry_id = SecureRandom.uuid
  @mutex.synchronize do
    @entries[[correlation_id, request_id]] = [entry_id, token]
  end
  entry_id
end

#sizeInteger

Returns number of currently-registered tokens. Used by tests and operator dashboards.

Returns:

  • (Integer)

    number of currently-registered tokens. Used by tests and operator dashboards.



1765
1766
1767
# File 'lib/parse/agent/mcp_rack_app.rb', line 1765

def size
  @mutex.synchronize { @entries.size }
end