Class: Rubino::Run::ApprovalGate

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/run/approval_gate.rb

Overview

Synchronizes async HTTP decisions (approvals, clarifications) with the in-thread run loop. The run loop calls #await(id) and blocks; an HTTP endpoint calls #decide(id, value) to unblock it. One gate per run, owned by Executor and published in GateRegistry.

Implementation: one Queue per id, lazily created under a mutex. Each id must first be issued via #register before #decide will accept it — this prevents a stray POST with an arbitrary or replayed approval_id from unblocking an awaiting call. Decided ids are remembered with their resolved value so duplicate POSTs are idempotent (same decision returned, queue not pushed twice).

Id namespace is shared per run: approval ids and clarify ids are both UUIDs minted by UI::API and routed through the same registry entry.

Bounded wait (W1): #await never parks on a bare, effectively-infinite queue.pop. It loops over short, interruptible pop(timeout:) ticks, re-checking the cancelled flag and an absolute deadline each tick, so:

* an explicit #cancel! (run stop/teardown) wakes it within one tick,
* an abandoned approval (client closed the tab, no decision ever) is
  released at the configured deadline instead of holding the worker
  thread for 24h and exhausting the server pool.

On deadline expiry #await returns the EXPIRED sentinel (never an approve) and emits approval.expired; UI::API maps that to a safe DENY.

Constant Summary collapse

DEFAULT_TIMEOUT =

Default human-wait bound (seconds) used only when the caller passes none AND no config is reachable. The real default comes from approvals.wait_timeout_seconds. This is a SANE bound (15 minutes), not the old 24h: an unanswered approval must free its worker thread in minutes, not a day. nil = wait forever (opt-in, discouraged on servers).

900
EXPIRED =

Returned by #await when the human-wait deadline elapses with no decision. A distinct, non-approve sentinel: UI::API recognizes it and resolves the approval to a safe DENY (never an approve) and the clarification to nil — the abandoned-run safe default.

Object.new.freeze

Instance Method Summary collapse

Constructor Details

#initializeApprovalGate

Returns a new instance of ApprovalGate.



58
59
60
61
62
63
64
65
# File 'lib/rubino/run/approval_gate.rb', line 58

def initialize
  @queues = {}
  @issued = {} # id => recorder (or nil) — ids the gate will accept decisions for
  @decided = {} # id => decision — first-write-wins, used for idempotency
  @pending = {} # id => true while a thread is blocked in #await for it
  @cancelled = false # set by #cancel!; makes future/in-flight awaits raise
  @mutex = Mutex.new
end

Instance Method Details

#await(id, timeout: :config) ⇒ Object

Blocks until #decide is called for id, returns the decision value. Loops over short interruptible pops so a #cancel! or the deadline wakes it within one WAKE_TICK rather than parking on a bare pop.

Parameters:

  • timeout (Numeric, :config, nil) (defaults to: :config)

    seconds before giving up. :config (default) reads approvals.wait_timeout_seconds; nil waits forever (still interruptible by #cancel!).

Returns:

  • the decision value, or EXPIRED if the deadline elapses first.

Raises:

  • (Rubino::Interrupted)

    if the gate is #cancel!-ed (run stopped) while this call is parked, so the worker thread unwinds at once.



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/rubino/run/approval_gate.rb', line 94

def await(id, timeout: :config)
  timeout = configured_timeout if timeout == :config
  queue = queue_for(id)
  # Lose the wake-up race safely: if #cancel! already fired, raise now
  # rather than park on a queue nothing will ever push to.
  raise Rubino::Interrupted if mark_pending(id)

  deadline = timeout && (monotonic_now + timeout)
  begin
    loop do
      decision = pop_tick(id, queue, deadline)
      next if decision.equal?(:tick) # woke on a tick boundary; re-check

      raise Rubino::Interrupted if decision.equal?(CANCELLED)

      return decision # a real decision, or EXPIRED on deadline
    end
  ensure
    @mutex.synchronize do
      @pending.delete(id)
      @queues.delete(id)
    end
  end
end

#cancel!Object

Wakes every thread currently parked in #await (and any that park later) so they raise Interrupted and the worker thread unwinds. Called when a run is cancelled/stopped while parked on a human decision — without it the gate’s pop blocks until the deadline and holds a Solid Queue worker thread for the whole window. One-shot, like CancelToken: once cancelled the gate stays cancelled.



125
126
127
128
129
130
# File 'lib/rubino/run/approval_gate.rb', line 125

def cancel!
  @mutex.synchronize do
    @cancelled = true
    @pending.each_key { |id| (@queues[id] ||= Queue.new) << CANCELLED }
  end
end

#decide(id, decision) ⇒ :ok, ...

Records a decision for id. On :ok, emits an approval.decided event through the recorder captured at #register time (when one was provided) so the SSE client can confirm receipt.

Returns:

  • (:ok, :duplicate, :unknown)
    • :ok — first decision for a registered id; queue pushed.

    • :duplicate — id was already decided (a real decision OR an auto-expiry); previous value preserved, queue NOT pushed again.

    • :unknown — id was never #register-ed; nothing recorded.



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/rubino/run/approval_gate.rb', line 141

def decide(id, decision)
  recorder = nil
  status = @mutex.synchronize do
    if !@issued.key?(id)
      :unknown
    elsif @decided.key?(id)
      :duplicate
    else
      @decided[id] = decision
      recorder = @issued[id]
      :ok
    end
  end

  if status == :ok
    queue_for(id) << decision
    recorder&.emit("approval.decided", { approval_id: id, decision: decision })
  end
  status
end

#decision_for(id) ⇒ Object

Decision previously resolved for id, or nil if none. May be the EXPIRED sentinel when the wait deadline elapsed before any #decide.



164
165
166
# File 'lib/rubino/run/approval_gate.rb', line 164

def decision_for(id)
  @mutex.synchronize { @decided[id] }
end

#pending?Boolean

True when at least one #await call is currently blocked waiting for a decision. The SSE idle watchdog consults this (via GateRegistry) so it never reaps a run that is legitimately parked on a human answer.

Returns:

  • (Boolean)


70
71
72
# File 'lib/rubino/run/approval_gate.rb', line 70

def pending?
  @mutex.synchronize { @pending.any? }
end

#register(id, recorder: nil) ⇒ Object

Marks id as a valid target for a future #decide call, optionally binding a recorder used to emit approval.decided once a decision lands. Must be called before #decide; otherwise #decide rejects the id as unknown. Idempotent: re-registering an id is a no-op.



78
79
80
81
82
# File 'lib/rubino/run/approval_gate.rb', line 78

def register(id, recorder: nil)
  @mutex.synchronize do
    @issued[id] = recorder unless @issued.key?(id)
  end
end