Class: Stipa::ThreadPool
- Inherits:
-
Object
- Object
- Stipa::ThreadPool
- Defined in:
- lib/stipa/thread_pool.rb
Overview
Bounded thread pool with a fixed-depth work queue.
Design:
- SizedQueue (stdlib) handles all mutex/condvar complexity.
- Workers loop forever, pulling callables off the queue.
- Shutdown sends one :stop sentinel per worker (poison-pill pattern)
so every thread unblocks from its blocking `pop` and exits cleanly.
- submit returns true/false (never blocks the caller by default).
Capacity math:
pool_size=32, queue_depth=64, avg handler=10ms
→ steady-state concurrency at 1k req/s = 10 threads (31% utilization)
→ 64-slot queue absorbs ~64ms burst before 503s begin
Instance Method Summary collapse
-
#initialize(size: 16, queue_depth: nil, on_error: nil) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #queue_depth ⇒ Object
-
#shutdown(drain_timeout: 30.0) ⇒ Object
Graceful shutdown: stop accepting new jobs, drain the queue, then send a stop sentinel to every worker.
-
#submit(mode: :drop, push_timeout: 0.5, &job) ⇒ Object
Submit a job (callable block) to the pool.
Constructor Details
#initialize(size: 16, queue_depth: nil, on_error: nil) ⇒ ThreadPool
Returns a new instance of ThreadPool.
18 19 20 21 22 23 24 25 26 27 |
# File 'lib/stipa/thread_pool.rb', line 18 def initialize(size: 16, queue_depth: nil, on_error: nil) @size = size @queue_depth = queue_depth || size * 4 @on_error = on_error || method(:default_error_handler) @queue = SizedQueue.new(@queue_depth) @workers = [] @shutdown = false @mutex = Mutex.new @size.times { @workers << spawn_worker } end |
Instance Method Details
#queue_depth ⇒ Object
75 |
# File 'lib/stipa/thread_pool.rb', line 75 def queue_depth; @queue.length; end |
#shutdown(drain_timeout: 30.0) ⇒ Object
Graceful shutdown: stop accepting new jobs, drain the queue, then send a stop sentinel to every worker.
66 67 68 69 70 71 72 73 |
# File 'lib/stipa/thread_pool.rb', line 66 def shutdown(drain_timeout: 30.0) @mutex.synchronize { @shutdown = true } deadline = Time.now + drain_timeout sleep 0.05 until @queue.empty? || Time.now >= deadline # Poison-pill: one :stop per worker so each thread unblocks from pop @size.times { @queue.push(:stop) } @workers.each { |t| t.join(5) } # 5s hard join timeout per thread end |
#submit(mode: :drop, push_timeout: 0.5, &job) ⇒ Object
Submit a job (callable block) to the pool.
mode: :drop — return false immediately if queue is full (default).
:block — spin up to push_timeout seconds before giving up.
Returns true if the job was accepted, false if dropped.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/stipa/thread_pool.rb', line 35 def submit(mode: :drop, push_timeout: 0.5, &job) raise ArgumentError, 'job block required' unless job return false if @shutdown case mode when :drop # Non-blocking push. SizedQueue raises ThreadError when full. begin @queue.push(job, true) # true = non_block true rescue ThreadError false end when :block deadline = Time.now + push_timeout loop do begin @queue.push(job, true) return true rescue ThreadError return false if Time.now >= deadline sleep 0.001 # 1ms spin — releases GVL while waiting end end else raise ArgumentError, "unknown mode: #{mode.inspect}" end end |