Class: Phronomy::Concurrency::ConcurrencyGate

Inherits:
Object
  • Object
show all
Defined in:
lib/phronomy/concurrency/concurrency_gate.rb

Overview

A counting semaphore that enforces a concurrency cap across a named resource category (e.g. agent tasks, tool tasks, LLM calls).

When +max_concurrent+ is +nil+ the gate is a no-op and all callers pass through immediately without acquiring a slot.

Backpressure behaviour when the gate is full is controlled by the +on_full:+ keyword: +:reject+ — raise BackpressureError immediately +:wait+ — block the calling fiber/thread until a slot is free +:timeout+ — like +:wait+ but raises BackpressureError after +timeout:+ seconds if no slot becomes available

Examples:

gate = Phronomy::Concurrency::ConcurrencyGate.new(max_concurrent: 5, name: :agent)
gate.acquire(on_full: :reject) do
  run_agent_task
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_concurrent:, name: nil) ⇒ ConcurrencyGate

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of ConcurrencyGate.

Parameters:

  • max_concurrent (Integer, nil)

    concurrency cap; nil = unlimited

  • name (Symbol, String, nil) (defaults to: nil)

    human-readable label used in error messages



27
28
29
30
31
32
33
# File 'lib/phronomy/concurrency/concurrency_gate.rb', line 27

def initialize(max_concurrent:, name: nil)
  @max = max_concurrent
  @name = name
  @mutex = Mutex.new
  @cond = ConditionVariable.new
  @count = 0
end

Instance Attribute Details

#maxObject (readonly)

Returns the configured cap (or nil when unlimited).



36
37
38
# File 'lib/phronomy/concurrency/concurrency_gate.rb', line 36

def max
  @max
end

#nameObject (readonly)

Returns the name label.



39
40
41
# File 'lib/phronomy/concurrency/concurrency_gate.rb', line 39

def name
  @name
end

Instance Method Details

#acquire(on_full: :wait, timeout: nil) { ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Acquires a slot, executes +block+, then releases the slot. When the gate is unlimited (max is nil) the block runs directly.

Parameters:

  • on_full (:reject, :wait, :timeout) (defaults to: :wait)

    backpressure strategy

  • timeout (Numeric, nil) (defaults to: nil)

    seconds before +:timeout+ gives up

Yields:

Returns:

  • block return value

Raises:



55
56
57
58
59
60
61
62
63
64
# File 'lib/phronomy/concurrency/concurrency_gate.rb', line 55

def acquire(on_full: :wait, timeout: nil, &block)
  return block.call if @max.nil?

  _acquire_slot(on_full: on_full, timeout: timeout)
  begin
    block.call
  ensure
    _release_slot
  end
end

#current_countObject

Returns the number of slots currently in use.



42
43
44
# File 'lib/phronomy/concurrency/concurrency_gate.rb', line 42

def current_count
  @mutex.synchronize { @count }
end