Class: Rubino::Run::ApprovalGate
- Inherits:
-
Object
- Object
- Rubino::Run::ApprovalGate
- Defined in:
- lib/rubino/run/approval_gate.rb
Overview
Synchronizes async HTTP decisions (approvals, clarifications) with the in-thread run loop. The run loop calls #await(id) and blocks; an HTTP endpoint calls #decide(id, value) to unblock it. One gate per run, owned by Executor and published in GateRegistry.
Implementation: one Queue per id, lazily created under a mutex. Each id must first be issued via #register before #decide will accept it — this prevents a stray POST with an arbitrary or replayed approval_id from unblocking an awaiting call. Decided ids are remembered with their resolved value so duplicate POSTs are idempotent (same decision returned, queue not pushed twice).
Id namespace is shared per run: approval ids and clarify ids are both UUIDs minted by UI::API and routed through the same registry entry.
Bounded wait (W1): #await never parks on a bare, effectively-infinite queue.pop. It loops over short, interruptible pop(timeout:) ticks, re-checking the cancelled flag and an absolute deadline each tick, so:
* an explicit #cancel! (run stop/teardown) wakes it within one tick,
* an abandoned approval (client closed the tab, no decision ever) is
released at the configured deadline instead of holding the worker
thread for 24h and exhausting the server pool.
On deadline expiry #await returns the EXPIRED sentinel (never an approve) and emits approval.expired; UI::API maps that to a safe DENY.
Constant Summary collapse
- DEFAULT_TIMEOUT =
Default human-wait bound (seconds) used only when the caller passes none AND no config is reachable. The real default comes from approvals.wait_timeout_seconds. This is a SANE bound (15 minutes), not the old 24h: an unanswered approval must free its worker thread in minutes, not a day. nil = wait forever (opt-in, discouraged on servers).
900- EXPIRED =
Returned by #await when the human-wait deadline elapses with no decision. A distinct, non-approve sentinel: UI::API recognizes it and resolves the approval to a safe DENY (never an approve) and the clarification to nil — the abandoned-run safe default.
Object.new.freeze
Instance Method Summary collapse
-
#await(id, timeout: :config) ⇒ Object
Blocks until #decide is called for
id, returns the decision value. -
#cancel! ⇒ Object
Wakes every thread currently parked in #await (and any that park later) so they raise Interrupted and the worker thread unwinds.
-
#decide(id, decision) ⇒ :ok, ...
Records a decision for
id. -
#decision_for(id) ⇒ Object
Decision previously resolved for
id, or nil if none. -
#initialize ⇒ ApprovalGate
constructor
A new instance of ApprovalGate.
-
#pending? ⇒ Boolean
True when at least one #await call is currently blocked waiting for a decision.
-
#register(id, recorder: nil) ⇒ Object
Marks
idas a valid target for a future #decide call, optionally binding a recorder used to emitapproval.decidedonce a decision lands.
Constructor Details
#initialize ⇒ ApprovalGate
Returns a new instance of ApprovalGate.
58 59 60 61 62 63 64 65 |
# File 'lib/rubino/run/approval_gate.rb', line 58 def initialize @queues = {} @issued = {} # id => recorder (or nil) — ids the gate will accept decisions for @decided = {} # id => decision — first-write-wins, used for idempotency @pending = {} # id => true while a thread is blocked in #await for it @cancelled = false # set by #cancel!; makes future/in-flight awaits raise @mutex = Mutex.new end |
Instance Method Details
#await(id, timeout: :config) ⇒ Object
Blocks until #decide is called for id, returns the decision value. Loops over short interruptible pops so a #cancel! or the deadline wakes it within one WAKE_TICK rather than parking on a bare pop.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/rubino/run/approval_gate.rb', line 94 def await(id, timeout: :config) timeout = configured_timeout if timeout == :config queue = queue_for(id) # Lose the wake-up race safely: if #cancel! already fired, raise now # rather than park on a queue nothing will ever push to. raise Rubino::Interrupted if mark_pending(id) deadline = timeout && (monotonic_now + timeout) begin loop do decision = pop_tick(id, queue, deadline) next if decision.equal?(:tick) # woke on a tick boundary; re-check raise Rubino::Interrupted if decision.equal?(CANCELLED) return decision # a real decision, or EXPIRED on deadline end ensure @mutex.synchronize do @pending.delete(id) @queues.delete(id) end end end |
#cancel! ⇒ Object
Wakes every thread currently parked in #await (and any that park later) so they raise Interrupted and the worker thread unwinds. Called when a run is cancelled/stopped while parked on a human decision — without it the gate’s pop blocks until the deadline and holds a Solid Queue worker thread for the whole window. One-shot, like CancelToken: once cancelled the gate stays cancelled.
125 126 127 128 129 130 |
# File 'lib/rubino/run/approval_gate.rb', line 125 def cancel! @mutex.synchronize do @cancelled = true @pending.each_key { |id| (@queues[id] ||= Queue.new) << CANCELLED } end end |
#decide(id, decision) ⇒ :ok, ...
Records a decision for id. On :ok, emits an approval.decided event through the recorder captured at #register time (when one was provided) so the SSE client can confirm receipt.
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/rubino/run/approval_gate.rb', line 141 def decide(id, decision) recorder = nil status = @mutex.synchronize do if !@issued.key?(id) :unknown elsif @decided.key?(id) :duplicate else @decided[id] = decision recorder = @issued[id] :ok end end if status == :ok queue_for(id) << decision recorder&.emit("approval.decided", { approval_id: id, decision: decision }) end status end |
#decision_for(id) ⇒ Object
Decision previously resolved for id, or nil if none. May be the EXPIRED sentinel when the wait deadline elapsed before any #decide.
164 165 166 |
# File 'lib/rubino/run/approval_gate.rb', line 164 def decision_for(id) @mutex.synchronize { @decided[id] } end |
#pending? ⇒ Boolean
True when at least one #await call is currently blocked waiting for a decision. The SSE idle watchdog consults this (via GateRegistry) so it never reaps a run that is legitimately parked on a human answer.
70 71 72 |
# File 'lib/rubino/run/approval_gate.rb', line 70 def pending? @mutex.synchronize { @pending.any? } end |
#register(id, recorder: nil) ⇒ Object
Marks id as a valid target for a future #decide call, optionally binding a recorder used to emit approval.decided once a decision lands. Must be called before #decide; otherwise #decide rejects the id as unknown. Idempotent: re-registering an id is a no-op.
78 79 80 81 82 |
# File 'lib/rubino/run/approval_gate.rb', line 78 def register(id, recorder: nil) @mutex.synchronize do @issued[id] = recorder unless @issued.key?(id) end end |