Class: Phronomy::BlockingAdapterPool

Inherits:
Object
  • Object
show all
Defined in:
lib/phronomy/blocking_adapter_pool.rb

Overview

A bounded, observable thread pool for blocking I/O operations.

Architectural boundary

BlockingAdapterPool is the only place in Phronomy that uses raw OS threads for I/O. All third-party gem calls whose internal I/O Phronomy cannot control — including RubyLLM, ActiveRecord, Redis, Faraday, and MCP stdio transport — must route through this pool (or a named pool obtained via Runtime#pool). Custom non-blocking HTTP/selector runtimes are intentionally out of scope; the pool + cooperative scheduler combination satisfies all current concurrency requirements without that complexity. (See ADR-010.)

All blocking calls (LLM HTTP, MCP stdio, ActiveRecord, Redis, etc.) must be submitted through this pool so that:

  1. The total number of OS threads is capped.
  2. Queue depth is bounded (backpressure when the pool is saturated).
  3. Per-operation timeouts are enforced consistently.
  4. Abandoned (timed-out) operations are tracked and logged.
  5. Metrics (active count, queue depth, abandoned count, avg wait time) are observable at runtime.

Examples:

Submitting a blocking LLM call

op = runtime.blocking_io.submit(timeout: 30) { chat.ask(message) }
result = op.await   # blocks the calling thread until done

With cancellation

token = Phronomy::CancellationToken.timeout_after(60)
op = pool.submit(timeout: 30, cancellation_token: token) { expensive_call }
result = op.await

Defined Under Namespace

Classes: PendingOperation

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pool_size: 10, queue_size: 100, name: nil, logger: nil) ⇒ BlockingAdapterPool

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of BlockingAdapterPool.

Parameters:

  • pool_size (Integer) (defaults to: 10)

    maximum number of worker threads

  • queue_size (Integer) (defaults to: 100)

    maximum pending operations waiting for a worker

  • name (String, Symbol, nil) (defaults to: nil)

    optional pool name used in thread labels

  • logger (Logger, nil) (defaults to: nil)

    optional logger for warnings



269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/phronomy/blocking_adapter_pool.rb', line 269

def initialize(pool_size: 10, queue_size: 100, name: nil, logger: nil)
  @pool_size = pool_size
  @queue_size = queue_size
  @name = name
  @logger = logger
  @queue = SizedQueue.new(queue_size)
  @active_count = 0
  @abandoned_count = 0
  @total_wait_ns = 0
  @completed_count = 0
  @mutex = Mutex.new
  @shutdown = false
  @workers = Array.new(pool_size) { |i| spawn_worker(i) }
end

Instance Attribute Details

#nameString, ... (readonly)

Returns pool name used in thread labels.

Returns:

  • (String, Symbol, nil)

    pool name used in thread labels



391
392
393
# File 'lib/phronomy/blocking_adapter_pool.rb', line 391

def name
  @name
end

#pool_sizeInteger (readonly)

Returns configured maximum number of worker threads.

Returns:

  • (Integer)

    configured maximum number of worker threads



385
386
387
# File 'lib/phronomy/blocking_adapter_pool.rb', line 385

def pool_size
  @pool_size
end

#queue_sizeInteger (readonly)

Returns configured maximum queue depth.

Returns:

  • (Integer)

    configured maximum queue depth



388
389
390
# File 'lib/phronomy/blocking_adapter_pool.rb', line 388

def queue_size
  @queue_size
end

Instance Method Details

#abandoned_countInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns number of operations that were abandoned due to timeout.

Returns:

  • (Integer)

    number of operations that were abandoned due to timeout



368
369
370
# File 'lib/phronomy/blocking_adapter_pool.rb', line 368

def abandoned_count
  @mutex.synchronize { @abandoned_count }
end

#active_countInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns number of operations currently executing on workers.

Returns:

  • (Integer)

    number of operations currently executing on workers



356
357
358
# File 'lib/phronomy/blocking_adapter_pool.rb', line 356

def active_count
  @mutex.synchronize { @active_count }
end

#average_wait_secondsFloat

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Average time (in seconds) that completed operations spent in the queue waiting for a worker. Returns 0.0 when no operations have completed yet.

Returns:

  • (Float)


376
377
378
379
380
381
382
# File 'lib/phronomy/blocking_adapter_pool.rb', line 376

def average_wait_seconds
  @mutex.synchronize do
    return 0.0 if @completed_count.zero?

    @total_wait_ns / @completed_count.to_f / 1_000_000_000.0
  end
end

#queue_depthInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns number of operations waiting in the queue.

Returns:

  • (Integer)

    number of operations waiting in the queue



362
363
364
# File 'lib/phronomy/blocking_adapter_pool.rb', line 362

def queue_depth
  @queue.size
end

#shutdown(drain_timeout: 30) ⇒ self

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Gracefully drains the pool and terminates all worker threads. Waits up to +drain_timeout+ seconds for in-flight operations to finish.

Closing the underlying SizedQueue signals workers to exit after draining remaining items, without blocking on a full-queue push.

Parameters:

  • drain_timeout (Numeric) (defaults to: 30)

    seconds to wait for workers to finish

Returns:

  • (self)


345
346
347
348
349
350
# File 'lib/phronomy/blocking_adapter_pool.rb', line 345

def shutdown(drain_timeout: 30)
  @shutdown = true
  @queue.close
  @workers.each { |t| t.join(drain_timeout) }
  self
end

#submit(timeout: nil, cancellation_token: nil, on_full: :wait, full_timeout: nil) { ... } ⇒ PendingOperation

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Note:

Cooperative callers: if you are running under the :fiber backend (i.e. inside a DeterministicScheduler Fiber), set +timeout:+ here rather than on Phronomy::BlockingAdapterPool::PendingOperation#await. The await-time timeout is not enforced on the cooperative path (the Fiber cannot preempt a running worker thread). A submit-time timeout triggers on the worker side and marks the operation abandoned, which unblocks the waiting Fiber via the normal on-complete callback.

Submits a blocking operation to the pool. Returns a PendingOperation immediately; the block runs on a worker thread.

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    seconds before the operation is abandoned

  • cancellation_token (CancellationToken, nil) (defaults to: nil)

Yields:

  • block containing the blocking call

Returns:

Raises:



302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/phronomy/blocking_adapter_pool.rb', line 302

def submit(timeout: nil, cancellation_token: nil, on_full: :wait, full_timeout: nil, &block)
  raise Phronomy::PoolShutdownError, "pool has been shut down" if @shutdown

  op = PendingOperation.new(block, timeout: timeout, cancellation_token: cancellation_token,
    on_abandoned: timeout ? -> { @mutex.synchronize { @abandoned_count += 1 } } : nil)
  begin
    case on_full
    when :raise
      begin
        @queue.push(op, true)
      rescue ThreadError
        raise Phronomy::BackpressureError, "BlockingAdapterPool queue is full (depth: #{@queue_size})"
      end
    when :timeout
      deadline = full_timeout ? (Process.clock_gettime(Process::CLOCK_MONOTONIC) + full_timeout) : nil
      loop do
        @queue.push(op, true)
        break
      rescue ThreadError
        if deadline && Process.clock_gettime(Process::CLOCK_MONOTONIC) >= deadline
          raise Phronomy::TimeoutError, "timed out waiting for a free slot in BlockingAdapterPool"
        end
        sleep(0.005)
      end
    else # :wait (default)
      @queue.push(op)
    end
  rescue ClosedQueueError
    # Shutdown raced with this submit — treat as if @shutdown was already set.
    raise Phronomy::PoolShutdownError, "pool has been shut down"
  end
  op
end