Class: Mycel::ThreadPool
- Inherits:
-
Object
- Object
- Mycel::ThreadPool
- Defined in:
- lib/mycel.rb
Overview
ThreadPool — fixed-size worker pool, plug-in for Session#executor.
The default Job#start strategy is ‘Thread.new` per request; that’s fine for moderate load and is bounded by Session’s max_concurrent_jobs. For high-frequency RPC, the per-request thread creation overhead can dominate. Plug a ThreadPool in to amortise that cost:
pool = Mycel::ThreadPool.new(size: 16)
hub = Mycel::Channel::Hub.new(executor: pool)
# ... later ...
pool.shutdown
The pool is intentionally minimal: a Queue, N workers, and a shutdown sentinel. It is NOT a general-purpose executor (no cancellation, no priorities, no return values) — it just satisfies the executor protocol (‘call(&block)`) that Session expects.
Error semantics: any StandardError that escapes a queued task is **swallowed silently**. This is intentional. A worker that dies on an unhandled exception is unrecoverable — the pool would gradually deplete itself and eventually deadlock the Session. The contract is therefore: *tasks must own their error handling*. In Mycel the only producer of pool tasks is Job#start, which already wraps its body in ‘begin/rescue` and routes failures through `:error` / response payloads. If you submit your own block via the executor protocol, do the same — assume nothing escapes the pool.
Instance Method Summary collapse
-
#call(&block) ⇒ Object
Executor protocol: hand a block to the pool to run on a worker.
-
#initialize(size:) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #shutdown ⇒ Object
Constructor Details
#initialize(size:) ⇒ ThreadPool
Returns a new instance of ThreadPool.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/mycel.rb', line 114 def initialize(size:) raise ArgumentError, "size must be > 0" unless size.is_a?(Integer) && size > 0 @queue = Queue.new @workers = Array.new(size) { Thread.new { loop { task = @queue.deq break if task == :__mycel_pool_stop__ begin task.call rescue StandardError # Tasks are responsible for their own error handling # (Job#start wraps in begin/rescue and routes through # the :error callback). Anything that escapes here # would otherwise terminate the worker, so swallow. end } } } @stopped = false @lock = Monitor.new end |
Instance Method Details
#call(&block) ⇒ Object
Executor protocol: hand a block to the pool to run on a worker.
138 139 140 141 |
# File 'lib/mycel.rb', line 138 def call(&block) raise "ThreadPool already shut down" if @lock.synchronize { @stopped } @queue.enq(block) end |
#shutdown ⇒ Object
143 144 145 146 147 148 149 150 |
# File 'lib/mycel.rb', line 143 def shutdown @lock.synchronize { return if @stopped @stopped = true } @workers.size.times { @queue.enq(:__mycel_pool_stop__) } @workers.each(&:join) end |