Class: Phronomy::Concurrency::BlockingAdapterPool

Inherits:
Object
  • Object
show all
Defined in:
lib/phronomy/concurrency/blocking_adapter_pool.rb

Overview

A bounded, observable thread pool for blocking I/O operations.

Architectural boundary

BlockingAdapterPool is the only place in Phronomy that uses raw OS threads for I/O. All third-party gem calls whose internal I/O Phronomy cannot control — including RubyLLM, ActiveRecord, Redis, Faraday, and MCP stdio transport — must route through this pool (or a named pool obtained via Runtime#pool). Custom non-blocking HTTP/selector runtimes are intentionally out of scope; the pool + cooperative scheduler combination satisfies all current concurrency requirements without that complexity. (See ADR-010.)

All blocking calls (LLM HTTP, MCP stdio, ActiveRecord, Redis, etc.) must be submitted through this pool so that:

  1. The total number of OS threads is capped.
  2. Queue depth is bounded (backpressure when the pool is saturated).
  3. Per-operation timeouts are enforced consistently.
  4. Abandoned (timed-out) operations are tracked and logged.
  5. Metrics (active count, queue depth, abandoned count, avg wait time) are observable at runtime.

Examples:

Submitting a blocking LLM call

op = runtime.blocking_io.submit(timeout: 30) { chat.ask(message) }
result = op.await   # blocks the calling thread until done

With cancellation

token = Phronomy::Concurrency::CancellationToken.timeout_after(60)
op = pool.submit(timeout: 30, cancellation_token: token) { expensive_call }
result = op.await

Defined Under Namespace

Classes: PendingOperation

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pool_size: 10, queue_size: 100, name: nil, logger: nil) ⇒ BlockingAdapterPool

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of BlockingAdapterPool.

Parameters:

  • pool_size (Integer) (defaults to: 10)

    maximum number of worker threads

  • queue_size (Integer) (defaults to: 100)

    maximum pending operations waiting for a worker

  • name (String, Symbol, nil) (defaults to: nil)

    optional pool name used in thread labels

  • logger (Logger, nil) (defaults to: nil)

    optional logger for warnings



276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/phronomy/concurrency/blocking_adapter_pool.rb', line 276

def initialize(pool_size: 10, queue_size: 100, name: nil, logger: nil)
  @pool_size = pool_size
  @queue_size = queue_size
  @name = name
  @logger = logger
  @queue = SizedQueue.new(queue_size)
  @active_count = 0
  @abandoned_count = 0
  @total_wait_ns = 0
  @completed_count = 0
  @mutex = Mutex.new
  @shutdown = false
  @workers = Array.new(pool_size) { |i| spawn_worker(i) }
end

Instance Attribute Details

#nameString, ... (readonly)

Returns pool name used in thread labels.

Returns:

  • (String, Symbol, nil)

    pool name used in thread labels



398
399
400
# File 'lib/phronomy/concurrency/blocking_adapter_pool.rb', line 398

def name
  @name
end

#pool_sizeInteger (readonly)

Returns configured maximum number of worker threads.

Returns:

  • (Integer)

    configured maximum number of worker threads



392
393
394
# File 'lib/phronomy/concurrency/blocking_adapter_pool.rb', line 392

def pool_size
  @pool_size
end

#queue_sizeInteger (readonly)

Returns configured maximum queue depth.

Returns:

  • (Integer)

    configured maximum queue depth



395
396
397
# File 'lib/phronomy/concurrency/blocking_adapter_pool.rb', line 395

def queue_size
  @queue_size
end

Instance Method Details

#abandoned_countInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns number of operations that were abandoned due to timeout.

Returns:

  • (Integer)

    number of operations that were abandoned due to timeout



375
376
377
# File 'lib/phronomy/concurrency/blocking_adapter_pool.rb', line 375

def abandoned_count
  @mutex.synchronize { @abandoned_count }
end

#active_countInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns number of operations currently executing on workers.

Returns:

  • (Integer)

    number of operations currently executing on workers



363
364
365
# File 'lib/phronomy/concurrency/blocking_adapter_pool.rb', line 363

def active_count
  @mutex.synchronize { @active_count }
end

#average_wait_secondsFloat

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Average time (in seconds) that completed operations spent in the queue waiting for a worker. Returns 0.0 when no operations have completed yet.

Returns:

  • (Float)


383
384
385
386
387
388
389
# File 'lib/phronomy/concurrency/blocking_adapter_pool.rb', line 383

def average_wait_seconds
  @mutex.synchronize do
    return 0.0 if @completed_count.zero?

    @total_wait_ns / @completed_count.to_f / 1_000_000_000.0
  end
end

#queue_depthInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns number of operations waiting in the queue.

Returns:

  • (Integer)

    number of operations waiting in the queue



369
370
371
# File 'lib/phronomy/concurrency/blocking_adapter_pool.rb', line 369

def queue_depth
  @queue.size
end

#shutdown(drain_timeout: 30) ⇒ self

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Gracefully drains the pool and terminates all worker threads. Waits up to +drain_timeout+ seconds for in-flight operations to finish.

Closing the underlying SizedQueue signals workers to exit after draining remaining items, without blocking on a full-queue push.

Parameters:

  • drain_timeout (Numeric) (defaults to: 30)

    seconds to wait for workers to finish

Returns:

  • (self)


352
353
354
355
356
357
# File 'lib/phronomy/concurrency/blocking_adapter_pool.rb', line 352

def shutdown(drain_timeout: 30)
  @shutdown = true
  @queue.close
  @workers.each { |t| t.join(drain_timeout) }
  self
end

#submit(timeout: nil, cancellation_token: nil, on_full: :wait, full_timeout: nil) { ... } ⇒ PendingOperation

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Note:

Cooperative callers: if you are running under the :fiber backend (i.e. inside a DeterministicScheduler Fiber), set +timeout:+ here rather than on Phronomy::Concurrency::BlockingAdapterPool::PendingOperation#await. The await-time timeout is not enforced on the cooperative path (the Fiber cannot preempt a running worker thread). A submit-time timeout triggers on the worker side and marks the operation abandoned, which unblocks the waiting Fiber via the normal on-complete callback.

Submits a blocking operation to the pool. Returns a PendingOperation immediately; the block runs on a worker thread.

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    seconds before the operation is abandoned

  • cancellation_token (CancellationToken, nil) (defaults to: nil)

Yields:

  • block containing the blocking call

Returns:

Raises:



309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/phronomy/concurrency/blocking_adapter_pool.rb', line 309

def submit(timeout: nil, cancellation_token: nil, on_full: :wait, full_timeout: nil, &block)
  raise Phronomy::PoolShutdownError, "pool has been shut down" if @shutdown

  op = PendingOperation.new(block, timeout: timeout, cancellation_token: cancellation_token,
    on_abandoned: timeout ? -> { @mutex.synchronize { @abandoned_count += 1 } } : nil)
  begin
    case on_full
    when :raise
      begin
        @queue.push(op, true)
      rescue ThreadError
        raise Phronomy::BackpressureError, "BlockingAdapterPool queue is full (depth: #{@queue_size})"
      end
    when :timeout
      deadline = full_timeout ? (Process.clock_gettime(Process::CLOCK_MONOTONIC) + full_timeout) : nil
      loop do
        @queue.push(op, true)
        break
      rescue ThreadError
        if deadline && Process.clock_gettime(Process::CLOCK_MONOTONIC) >= deadline
          raise Phronomy::TimeoutError, "timed out waiting for a free slot in BlockingAdapterPool"
        end
        sleep(0.005)
      end
    else # :wait (default)
      @queue.push(op)
    end
  rescue ClosedQueueError
    # Shutdown raced with this submit — treat as if @shutdown was already set.
    raise Phronomy::PoolShutdownError, "pool has been shut down"
  end
  op
end