Class: Puma::ThreadPool
- Inherits:
-
Object
- Object
- Puma::ThreadPool
- Defined in:
- lib/puma/thread_pool.rb
Overview
Internal Docs for A simple thread pool management object.
Each Puma “worker” has a thread pool to process requests.
First a connection to a client is made in ‘Puma::Server`. It is wrapped in a `Puma::Client` instance and then passed to the `Puma::Reactor` to ensure the whole request is buffered into memory. Once the request is ready, it is passed into a thread pool via the `Puma::ThreadPool#<<` operator where it is stored in a `@todo` array.
Each thread in the pool has an internal loop where it pulls a request from the ‘@todo` array and processes it.
Defined Under Namespace
Classes: Automaton, ForceShutdown, ProcessorThread
Constant Summary collapse
- SHUTDOWN_GRACE_TIME =
How long, after raising the ForceShutdown of a thread during forced shutdown mode, to wait for the thread to try and finish up its work before leaving the thread to die on the vine.
5
Instance Attribute Summary collapse
- #busy_threads ⇒ Object readonly
-
#max ⇒ Object
Returns the value of attribute max.
-
#min ⇒ Object
Returns the value of attribute min.
-
#out_of_band_running ⇒ Object
readonly
seconds.
- #pool_capacity ⇒ Object readonly
-
#spawned ⇒ Object
readonly
Returns the value of attribute spawned.
-
#trim_requested ⇒ Object
readonly
Returns the value of attribute trim_requested.
-
#waiting ⇒ Object
readonly
Returns the value of attribute waiting.
Instance Method Summary collapse
-
#<<(work) ⇒ Object
Add
workto the todo list for a Thread to pickup and process. - #auto_reap!(timeout = @reaping_time) ⇒ Object
- #auto_trim!(timeout = @auto_trim_time) ⇒ Object
-
#backlog ⇒ Object
How many objects have yet to be processed by the pool?.
-
#backlog_max ⇒ Object
The maximum size of the backlog.
-
#can_spawn_processor? ⇒ Boolean
:nodoc:.
-
#initialize(name, options = {}, server: nil, &block) ⇒ ThreadPool
constructor
Maintain a minimum of
minand maximum ofmaxthreads in the pool. -
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
- #reset_max ⇒ Object
-
#shutdown(timeout) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish.
-
#spawn_thread_if_needed ⇒ Object
:nodoc:.
-
#stats ⇒ Hash
generate stats hash so as not to perform multiple locks.
-
#trim(force = false) ⇒ Object
If there are any free threads in the pool, tell one to go ahead and exit.
- #wait_while_out_of_band_running ⇒ Object
-
#with_force_shutdown ⇒ Object
Allows ThreadPool::ForceShutdown to be raised within the provided block if the thread is forced to shutdown during execution.
- #with_mutex(&block) ⇒ Object
Constructor Details
#initialize(name, options = {}, server: nil, &block) ⇒ ThreadPool
Maintain a minimum of min and maximum of max threads in the pool.
The block passed is the work that will be performed in each thread.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/puma/thread_pool.rb', line 86 def initialize(name, = {}, server: nil, &block) @server = server @not_empty = ConditionVariable.new @not_full = ConditionVariable.new @mutex = Mutex.new @todo = Queue.new @backlog_max = 0 @spawned = 0 @waiting = 0 @name = name @min = Integer([:min_threads]) @max = Integer([:max_threads]) @max_io_threads = Integer([:max_io_threads] || 0) # Not an 'exposed' option, options[:pool_shutdown_grace_time] is used in CI # to shorten @shutdown_grace_time from SHUTDOWN_GRACE_TIME. Parallel CI # makes stubbing constants difficult. @shutdown_grace_time = Float([:pool_shutdown_grace_time] || SHUTDOWN_GRACE_TIME) @shutdown_debug = [:shutdown_debug] @block = block @out_of_band = [:out_of_band] @out_of_band_running = false @out_of_band_condvar = ConditionVariable.new @before_thread_start = [:before_thread_start] @before_thread_exit = [:before_thread_exit] @reaping_time = [:reaping_time] @auto_trim_time = [:auto_trim_time] @shutdown = false @trim_requested = 0 @out_of_band_pending = false @processors = [] @auto_trim = nil @reaper = nil @mutex.synchronize do @min.times do spawn_thread @not_full.wait(@mutex) end end @force_shutdown = false @shutdown_mutex = Mutex.new end |
Instance Attribute Details
#busy_threads ⇒ Object (readonly)
180 181 182 |
# File 'lib/puma/thread_pool.rb', line 180 def busy_threads with_mutex { @spawned - @waiting + @todo.size } end |
#max ⇒ Object
Returns the value of attribute max.
139 140 141 |
# File 'lib/puma/thread_pool.rb', line 139 def max @max end |
#min ⇒ Object
Returns the value of attribute min.
139 140 141 |
# File 'lib/puma/thread_pool.rb', line 139 def min @min end |
#out_of_band_running ⇒ Object (readonly)
seconds
78 79 80 |
# File 'lib/puma/thread_pool.rb', line 78 def out_of_band_running @out_of_band_running end |
#pool_capacity ⇒ Object (readonly)
174 175 176 |
# File 'lib/puma/thread_pool.rb', line 174 def pool_capacity (waiting + (@max - spawned)).clamp(0, Float::INFINITY) end |
#spawned ⇒ Object (readonly)
Returns the value of attribute spawned.
138 139 140 |
# File 'lib/puma/thread_pool.rb', line 138 def spawned @spawned end |
#trim_requested ⇒ Object (readonly)
Returns the value of attribute trim_requested.
138 139 140 |
# File 'lib/puma/thread_pool.rb', line 138 def trim_requested @trim_requested end |
#waiting ⇒ Object (readonly)
Returns the value of attribute waiting.
138 139 140 |
# File 'lib/puma/thread_pool.rb', line 138 def waiting @waiting end |
Instance Method Details
#<<(work) ⇒ Object
Add work to the todo list for a Thread to pickup and process.
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
# File 'lib/puma/thread_pool.rb', line 335 def <<(work) with_mutex do if @shutdown raise "Unable to add work while shutting down" end @todo << work t = @todo.size @backlog_max = t if t > @backlog_max if @waiting < @todo.size and can_spawn_processor? spawn_thread end @not_empty.signal end self end |
#auto_reap!(timeout = @reaping_time) ⇒ Object
421 422 423 424 |
# File 'lib/puma/thread_pool.rb', line 421 def auto_reap!(timeout=@reaping_time) @reaper = Automaton.new(self, timeout, "#{@name} tp reap", :reap) @reaper.start! end |
#auto_trim!(timeout = @auto_trim_time) ⇒ Object
416 417 418 419 |
# File 'lib/puma/thread_pool.rb', line 416 def auto_trim!(timeout=@auto_trim_time) @auto_trim = Automaton.new(self, timeout, "#{@name} tp trim", :trim) @auto_trim.start! end |
#backlog ⇒ Object
How many objects have yet to be processed by the pool?
163 164 165 |
# File 'lib/puma/thread_pool.rb', line 163 def backlog with_mutex { @todo.size } end |
#backlog_max ⇒ Object
The maximum size of the backlog
169 170 171 |
# File 'lib/puma/thread_pool.rb', line 169 def backlog_max with_mutex { @backlog_max } end |
#can_spawn_processor? ⇒ Boolean
:nodoc:
Must be called with @mutex held!
328 329 330 331 332 |
# File 'lib/puma/thread_pool.rb', line 328 def can_spawn_processor? io_processors_count = @processors.count(&:marked_as_io_thread?) extra_io_processors_count = io_processors_count > @max_io_threads ? io_processors_count - @max_io_threads : 0 (@spawned - io_processors_count) < (@max - extra_io_processors_count) end |
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
378 379 380 381 382 383 384 385 386 387 |
# File 'lib/puma/thread_pool.rb', line 378 def reap with_mutex do @processors, dead_processors = @processors.partition(&:alive?) dead_processors.each do |processor| processor.kill @spawned -= 1 end end end |
#reset_max ⇒ Object
157 158 159 |
# File 'lib/puma/thread_pool.rb', line 157 def reset_max with_mutex { @backlog_max = 0 } end |
#shutdown(timeout) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish. Wait timeout seconds then raise ForceShutdown in remaining threads. Next, wait an extra @shutdown_grace_time seconds then force-kill remaining threads. Finally, wait 1 second for remaining threads to exit.
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 |
# File 'lib/puma/thread_pool.rb', line 444 def shutdown(timeout) threads = with_mutex do @shutdown = true @trim_requested = @spawned @not_empty.broadcast @not_full.broadcast @auto_trim&.stop @reaper&.stop # dup processors so that we join them all safely @processors.dup end if @shutdown_debug == true shutdown_debug("Shutdown initiated") end if timeout == -1 # Wait for threads to finish without force shutdown. threads.each(&:join) else join = ->(inner_timeout) do start = Process.clock_gettime(Process::CLOCK_MONOTONIC) threads.reject! do |t| remaining = inner_timeout - (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start) remaining > 0 && t.join(remaining) end end # Wait +timeout+ seconds for threads to finish. join.call(timeout) if @shutdown_debug == :on_force && !threads.empty? shutdown_debug("Shutdown timeout exceeded") end # If threads are still running, raise ForceShutdown and wait to finish. @shutdown_mutex.synchronize do @force_shutdown = true threads.each do |t| t.raise ForceShutdown if t[:with_force_shutdown] end end join.call(@shutdown_grace_time) if @shutdown_debug == :on_force && !threads.empty? shutdown_debug("Shutdown grace timeout exceeded") end # If threads are _still_ running, forcefully kill them and wait to finish. threads.each(&:kill) join.call(1) end @spawned = 0 @processors = [] end |
#spawn_thread_if_needed ⇒ Object
:nodoc:
354 355 356 357 358 359 360 |
# File 'lib/puma/thread_pool.rb', line 354 def spawn_thread_if_needed # :nodoc: with_mutex do if @waiting < @todo.size and can_spawn_processor? spawn_thread end end end |
#stats ⇒ Hash
generate stats hash so as not to perform multiple locks
143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/puma/thread_pool.rb', line 143 def stats with_mutex do temp = @backlog_max @backlog_max = 0 { backlog: @todo.size, running: @spawned, pool_capacity: pool_capacity, busy_threads: @spawned - @waiting + @todo.size, io_threads: @processors.count(&:marked_as_io_thread?), backlog_max: temp } end end |
#trim(force = false) ⇒ Object
If there are any free threads in the pool, tell one to go ahead and exit. If force is true, then a trim request is requested even if all threads are being utilized.
366 367 368 369 370 371 372 373 374 |
# File 'lib/puma/thread_pool.rb', line 366 def trim(force=false) with_mutex do free = @waiting - @todo.size if (force or free > 0) and @spawned - @trim_requested > @min @trim_requested += 1 @not_empty.signal end end end |
#wait_while_out_of_band_running ⇒ Object
309 310 311 312 313 314 315 |
# File 'lib/puma/thread_pool.rb', line 309 def wait_while_out_of_band_running return unless @out_of_band_running with_mutex do @out_of_band_condvar.wait(@mutex) while @out_of_band_running end end |
#with_force_shutdown ⇒ Object
Allows ThreadPool::ForceShutdown to be raised within the provided block if the thread is forced to shutdown during execution.
428 429 430 431 432 433 434 435 436 437 |
# File 'lib/puma/thread_pool.rb', line 428 def with_force_shutdown t = Thread.current @shutdown_mutex.synchronize do raise ForceShutdown if @force_shutdown t[:with_force_shutdown] = true end yield ensure t[:with_force_shutdown] = false end |
#with_mutex(&block) ⇒ Object
318 319 320 321 322 |
# File 'lib/puma/thread_pool.rb', line 318 def with_mutex(&block) @mutex.owned? ? yield : @mutex.synchronize(&block) end |