Class: Phronomy::BlockingAdapterPool
- Inherits:
-
Object
- Object
- Phronomy::BlockingAdapterPool
- Defined in:
- lib/phronomy/blocking_adapter_pool.rb
Overview
A bounded, observable thread pool for blocking I/O operations.
Architectural boundary
BlockingAdapterPool is the only place in Phronomy that uses raw OS threads
for I/O. All third-party gem calls whose internal I/O Phronomy cannot control
— including RubyLLM, ActiveRecord, Redis, Faraday, and MCP stdio transport —
must route through this pool (or a named pool obtained via
Runtime#pool). Custom non-blocking HTTP/selector runtimes are intentionally
out of scope; the pool + cooperative scheduler combination satisfies all
current concurrency requirements without that complexity. (See ADR-010.)
All blocking calls (LLM HTTP, MCP stdio, ActiveRecord, Redis, etc.) must be submitted through this pool so that:
- The total number of OS threads is capped.
- Queue depth is bounded (backpressure when the pool is saturated).
- Per-operation timeouts are enforced consistently.
- Abandoned (timed-out) operations are tracked and logged.
- Metrics (active count, queue depth, abandoned count, avg wait time) are observable at runtime.
Defined Under Namespace
Classes: PendingOperation
Instance Attribute Summary collapse
-
#name ⇒ String, ...
readonly
Pool name used in thread labels.
-
#pool_size ⇒ Integer
readonly
Configured maximum number of worker threads.
-
#queue_size ⇒ Integer
readonly
Configured maximum queue depth.
Instance Method Summary collapse
-
#abandoned_count ⇒ Integer
private
Number of operations that were abandoned due to timeout.
-
#active_count ⇒ Integer
private
Number of operations currently executing on workers.
-
#average_wait_seconds ⇒ Float
private
Average time (in seconds) that completed operations spent in the queue waiting for a worker.
-
#initialize(pool_size: 10, queue_size: 100, name: nil, logger: nil) ⇒ BlockingAdapterPool
constructor
private
A new instance of BlockingAdapterPool.
-
#queue_depth ⇒ Integer
private
Number of operations waiting in the queue.
-
#shutdown(drain_timeout: 30) ⇒ self
private
Gracefully drains the pool and terminates all worker threads.
-
#submit(timeout: nil, cancellation_token: nil, on_full: :wait, full_timeout: nil) { ... } ⇒ PendingOperation
private
Submits a blocking operation to the pool.
Constructor Details
#initialize(pool_size: 10, queue_size: 100, name: nil, logger: nil) ⇒ BlockingAdapterPool
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of BlockingAdapterPool.
269 270 271 272 273 274 275 276 277 278 279 280 281 282 |
# File 'lib/phronomy/blocking_adapter_pool.rb', line 269 def initialize(pool_size: 10, queue_size: 100, name: nil, logger: nil) @pool_size = pool_size @queue_size = queue_size @name = name @logger = logger @queue = SizedQueue.new(queue_size) @active_count = 0 @abandoned_count = 0 @total_wait_ns = 0 @completed_count = 0 @mutex = Mutex.new @shutdown = false @workers = Array.new(pool_size) { |i| spawn_worker(i) } end |
Instance Attribute Details
#name ⇒ String, ... (readonly)
Returns pool name used in thread labels.
391 392 393 |
# File 'lib/phronomy/blocking_adapter_pool.rb', line 391 def name @name end |
#pool_size ⇒ Integer (readonly)
Returns configured maximum number of worker threads.
385 386 387 |
# File 'lib/phronomy/blocking_adapter_pool.rb', line 385 def pool_size @pool_size end |
#queue_size ⇒ Integer (readonly)
Returns configured maximum queue depth.
388 389 390 |
# File 'lib/phronomy/blocking_adapter_pool.rb', line 388 def queue_size @queue_size end |
Instance Method Details
#abandoned_count ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns number of operations that were abandoned due to timeout.
368 369 370 |
# File 'lib/phronomy/blocking_adapter_pool.rb', line 368 def abandoned_count @mutex.synchronize { @abandoned_count } end |
#active_count ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns number of operations currently executing on workers.
356 357 358 |
# File 'lib/phronomy/blocking_adapter_pool.rb', line 356 def active_count @mutex.synchronize { @active_count } end |
#average_wait_seconds ⇒ Float
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Average time (in seconds) that completed operations spent in the queue waiting for a worker. Returns 0.0 when no operations have completed yet.
376 377 378 379 380 381 382 |
# File 'lib/phronomy/blocking_adapter_pool.rb', line 376 def average_wait_seconds @mutex.synchronize do return 0.0 if @completed_count.zero? @total_wait_ns / @completed_count.to_f / 1_000_000_000.0 end end |
#queue_depth ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns number of operations waiting in the queue.
362 363 364 |
# File 'lib/phronomy/blocking_adapter_pool.rb', line 362 def queue_depth @queue.size end |
#shutdown(drain_timeout: 30) ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Gracefully drains the pool and terminates all worker threads. Waits up to +drain_timeout+ seconds for in-flight operations to finish.
Closing the underlying SizedQueue signals workers to exit after draining remaining items, without blocking on a full-queue push.
345 346 347 348 349 350 |
# File 'lib/phronomy/blocking_adapter_pool.rb', line 345 def shutdown(drain_timeout: 30) @shutdown = true @queue.close @workers.each { |t| t.join(drain_timeout) } self end |
#submit(timeout: nil, cancellation_token: nil, on_full: :wait, full_timeout: nil) { ... } ⇒ PendingOperation
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cooperative callers: if you are running under the :fiber backend
(i.e. inside a DeterministicScheduler Fiber), set +timeout:+ here
rather than on Phronomy::BlockingAdapterPool::PendingOperation#await. The await-time timeout is not
enforced on the cooperative path (the Fiber cannot preempt a running
worker thread). A submit-time timeout triggers on the worker side and
marks the operation abandoned, which
unblocks the waiting Fiber via the normal on-complete callback.
Submits a blocking operation to the pool. Returns a PendingOperation immediately; the block runs on a worker thread.
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/phronomy/blocking_adapter_pool.rb', line 302 def submit(timeout: nil, cancellation_token: nil, on_full: :wait, full_timeout: nil, &block) raise Phronomy::PoolShutdownError, "pool has been shut down" if @shutdown op = PendingOperation.new(block, timeout: timeout, cancellation_token: cancellation_token, on_abandoned: timeout ? -> { @mutex.synchronize { @abandoned_count += 1 } } : nil) begin case on_full when :raise begin @queue.push(op, true) rescue ThreadError raise Phronomy::BackpressureError, "BlockingAdapterPool queue is full (depth: #{@queue_size})" end when :timeout deadline = full_timeout ? (Process.clock_gettime(Process::CLOCK_MONOTONIC) + full_timeout) : nil loop do @queue.push(op, true) break rescue ThreadError if deadline && Process.clock_gettime(Process::CLOCK_MONOTONIC) >= deadline raise Phronomy::TimeoutError, "timed out waiting for a free slot in BlockingAdapterPool" end sleep(0.005) end else # :wait (default) @queue.push(op) end rescue ClosedQueueError # Shutdown raced with this submit — treat as if @shutdown was already set. raise Phronomy::PoolShutdownError, "pool has been shut down" end op end |