Class: Mycel::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/mycel.rb

Overview

ThreadPool — fixed-size worker pool, plug-in for Session#executor.

The default Job#start strategy is ‘Thread.new` per request; that’s fine for moderate load and is bounded by Session’s max_concurrent_jobs. For high-frequency RPC, the per-request thread creation overhead can dominate. Plug a ThreadPool in to amortise that cost:

pool = Mycel::ThreadPool.new(size: 16)
hub  = Mycel::Channel::Hub.new(executor: pool)
# ... later ...
pool.shutdown

The pool is intentionally minimal: a Queue, N workers, and a shutdown sentinel. It is NOT a general-purpose executor (no cancellation, no priorities, no return values) — it just satisfies the executor protocol (‘call(&block)`) that Session expects.

Error semantics: any StandardError that escapes a queued task is **swallowed silently**. This is intentional. A worker that dies on an unhandled exception is unrecoverable — the pool would gradually deplete itself and eventually deadlock the Session. The contract is therefore: *tasks must own their error handling*. In Mycel the only producer of pool tasks is Job#start, which already wraps its body in ‘begin/rescue` and routes failures through `:error` / response payloads. If you submit your own block via the executor protocol, do the same — assume nothing escapes the pool.

Instance Method Summary collapse

Constructor Details

#initialize(size:) ⇒ ThreadPool

Returns a new instance of ThreadPool.

Raises:

  • (ArgumentError)


150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/mycel.rb', line 150

def initialize(size:)
  raise ArgumentError, "size must be > 0" unless size.is_a?(Integer) && size > 0
  @queue = Queue.new
  @workers = Array.new(size) {
    Thread.new {
      loop {
        task = @queue.deq
        break if task == :__mycel_pool_stop__
        begin
          task.call
        rescue StandardError
          # Tasks are responsible for their own error handling
          # (Job#start wraps in begin/rescue and routes through
          # the :error callback). Anything that escapes here
          # would otherwise terminate the worker, so swallow.
        end
      }
    }
  }
  @stopped = false
  @lock = Monitor.new
end

Instance Method Details

#call(&block) ⇒ Object

Executor protocol: hand a block to the pool to run on a worker.



174
175
176
177
# File 'lib/mycel.rb', line 174

def call(&block)
  raise "ThreadPool already shut down" if @lock.synchronize { @stopped }
  @queue.enq(block)
end

#shutdownObject



179
180
181
182
183
184
185
186
# File 'lib/mycel.rb', line 179

def shutdown
  @lock.synchronize {
    return if @stopped
    @stopped = true
  }
  @workers.size.times { @queue.enq(:__mycel_pool_stop__) }
  @workers.each(&:join)
end