Class: Phronomy::ConcurrencyGate

Inherits:
Object
  • Object
show all
Defined in:
lib/phronomy/concurrency_gate.rb

Overview

A counting semaphore that enforces a concurrency cap across a named resource category (e.g. agent tasks, tool tasks, LLM calls).

When +max_concurrent+ is +nil+ the gate is a no-op and all callers pass through immediately without acquiring a slot.

Backpressure behaviour when the gate is full is controlled by the +on_full:+ keyword: +:reject+ — raise BackpressureError immediately +:wait+ — block the calling fiber/thread until a slot is free +:timeout+ — like +:wait+ but raises BackpressureError after +timeout:+ seconds if no slot becomes available

Examples:

gate = Phronomy::ConcurrencyGate.new(max_concurrent: 5, name: :agent)
gate.acquire(on_full: :reject) do
  run_agent_task
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_concurrent:, name: nil) ⇒ ConcurrencyGate

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of ConcurrencyGate.

Parameters:

  • max_concurrent (Integer, nil)

    concurrency cap; nil = unlimited

  • name (Symbol, String, nil) (defaults to: nil)

    human-readable label used in error messages



26
27
28
29
30
31
32
# File 'lib/phronomy/concurrency_gate.rb', line 26

def initialize(max_concurrent:, name: nil)
  @max = max_concurrent
  @name = name
  @mutex = Mutex.new
  @cond = ConditionVariable.new
  @count = 0
end

Instance Attribute Details

#maxObject (readonly)

Returns the configured cap (or nil when unlimited).



35
36
37
# File 'lib/phronomy/concurrency_gate.rb', line 35

def max
  @max
end

#nameObject (readonly)

Returns the name label.



38
39
40
# File 'lib/phronomy/concurrency_gate.rb', line 38

def name
  @name
end

Instance Method Details

#acquire(on_full: :wait, timeout: nil) { ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Acquires a slot, executes +block+, then releases the slot. When the gate is unlimited (max is nil) the block runs directly.

Parameters:

  • on_full (:reject, :wait, :timeout) (defaults to: :wait)

    backpressure strategy

  • timeout (Numeric, nil) (defaults to: nil)

    seconds before +:timeout+ gives up

Yields:

Returns:

  • block return value

Raises:



54
55
56
57
58
59
60
61
62
63
# File 'lib/phronomy/concurrency_gate.rb', line 54

def acquire(on_full: :wait, timeout: nil, &block)
  return block.call if @max.nil?

  _acquire_slot(on_full: on_full, timeout: timeout)
  begin
    block.call
  ensure
    _release_slot
  end
end

#current_countObject

Returns the number of slots currently in use.



41
42
43
# File 'lib/phronomy/concurrency_gate.rb', line 41

def current_count
  @mutex.synchronize { @count }
end