Class: Parse::Agent::PendingElicitationRegistry

Inherits:
Object
  • Object
show all
Defined in:
lib/parse/agent/approval_gate.rb

Overview

Registry of server→client elicitation requests awaiting a client reply. Each pending request owns a Queue the answering POST pushes onto, waking the blocked tool thread. Structurally modeled on the transport's CancellationRegistry: a Mutex-guarded map keyed by the composite [correlation_id, elicitation_id], with session-bound delivery so one session can never answer another's prompt.

Instance Method Summary collapse

Constructor Details

#initializePendingElicitationRegistry

Returns a new instance of PendingElicitationRegistry.



117
118
119
120
# File 'lib/parse/agent/approval_gate.rb', line 117

def initialize
  @entries = {}
  @mutex = Mutex.new
end

Instance Method Details

#abort_all_for(correlation_id, _reason = nil) ⇒ Object

Wake every pending request for a session with an ABORT sentinel. Called on session DELETE teardown / listening-stream close.



153
154
155
156
157
158
159
160
161
162
# File 'lib/parse/agent/approval_gate.rb', line 153

def abort_all_for(correlation_id, _reason = nil)
  prefix = "#{correlation_id}"
  drained = @mutex.synchronize do
    matched = @entries.select { |k, _| k.start_with?(prefix) }
    matched.each_key { |k| @entries.delete(k) }
    matched.values
  end
  drained.each { |q| q << ApprovalGate::ABORT }
  drained.length
end

#deliver(correlation_id, elicitation_id, value) ⇒ Object

Deliver a client reply. Single-shot: atomically removes the entry and pushes the value onto its queue. Returns true if a matching pending entry existed, false otherwise (double-reply, late reply, or a guessed id under the wrong session).



137
138
139
140
141
142
# File 'lib/parse/agent/approval_gate.rb', line 137

def deliver(correlation_id, elicitation_id, value)
  queue = @mutex.synchronize { @entries.delete(key(correlation_id, elicitation_id)) }
  return false unless queue
  queue << value
  true
end

#deregister(correlation_id, elicitation_id) ⇒ Object

Idempotent removal — called by the waiter on wake/timeout so a late reply finds nothing.



146
147
148
149
# File 'lib/parse/agent/approval_gate.rb', line 146

def deregister(correlation_id, elicitation_id)
  @mutex.synchronize { @entries.delete(key(correlation_id, elicitation_id)) }
  nil
end

#register(correlation_id, elicitation_id) ⇒ Object

Register a pending request and return its Queue. Returns nil when the correlation id is blank (fail closed — caller maps to unavailable).



125
126
127
128
129
130
131
# File 'lib/parse/agent/approval_gate.rb', line 125

def register(correlation_id, elicitation_id)
  return nil if correlation_id.nil? || correlation_id.to_s.empty?
  return nil if elicitation_id.nil? || elicitation_id.to_s.empty?
  queue = Thread::Queue.new
  @mutex.synchronize { @entries[key(correlation_id, elicitation_id)] = queue }
  queue
end

#sizeInteger

Returns number of outstanding requests (tests/telemetry).

Returns:

  • (Integer)

    number of outstanding requests (tests/telemetry).



165
166
167
# File 'lib/parse/agent/approval_gate.rb', line 165

def size
  @mutex.synchronize { @entries.size }
end