Class: Puma::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/puma/thread_pool.rb

Overview

Internal Docs for A simple thread pool management object.

Each Puma “worker” has a thread pool to process requests.

First a connection to a client is made in ‘Puma::Server`. It is wrapped in a `Puma::Client` instance and then passed to the `Puma::Reactor` to ensure the whole request is buffered into memory. Once the request is ready, it is passed into a thread pool via the `Puma::ThreadPool#<<` operator where it is stored in a `@todo` array.

Each thread in the pool has an internal loop where it pulls a request from the ‘@todo` array and processes it.

Defined Under Namespace

Classes: Automaton, ForceShutdown, ProcessorThread

Constant Summary collapse

SHUTDOWN_GRACE_TIME =

How long, after raising the ForceShutdown of a thread during forced shutdown mode, to wait for the thread to try and finish up its work before leaving the thread to die on the vine.

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, options = {}, server: nil, &block) ⇒ ThreadPool

Maintain a minimum of min and maximum of max threads in the pool.

The block passed is the work that will be performed in each thread.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/puma/thread_pool.rb', line 86

def initialize(name, options = {}, server: nil, &block)
  @server = server

  @not_empty = ConditionVariable.new
  @not_full = ConditionVariable.new
  @mutex = Mutex.new
  @todo = Queue.new

  @backlog_max = 0
  @spawned = 0
  @waiting = 0

  @name = name
  @min = Integer(options[:min_threads])
  @max = Integer(options[:max_threads])
  @max_io_threads = Integer(options[:max_io_threads] || 0)

  # Not an 'exposed' option, options[:pool_shutdown_grace_time] is used in CI
  # to shorten @shutdown_grace_time from SHUTDOWN_GRACE_TIME. Parallel CI
  # makes stubbing constants difficult.
  @shutdown_grace_time = Float(options[:pool_shutdown_grace_time] || SHUTDOWN_GRACE_TIME)
  @shutdown_debug = options[:shutdown_debug]
  @block = block
  @out_of_band = options[:out_of_band]
  @out_of_band_running = false
  @out_of_band_condvar = ConditionVariable.new
  @before_thread_start = options[:before_thread_start]
  @before_thread_exit = options[:before_thread_exit]
  @reaping_time = options[:reaping_time]
  @auto_trim_time = options[:auto_trim_time]

  @shutdown = false

  @trim_requested = 0
  @out_of_band_pending = false

  @processors = []

  @auto_trim = nil
  @reaper = nil

  @mutex.synchronize do
    @min.times do
      spawn_thread
      @not_full.wait(@mutex)
    end
  end

  @force_shutdown = false
  @shutdown_mutex = Mutex.new
end

Instance Attribute Details

#busy_threadsObject (readonly)

Version:

  • 5.0.0



180
181
182
# File 'lib/puma/thread_pool.rb', line 180

def busy_threads
  with_mutex { @spawned - @waiting + @todo.size }
end

#maxObject

Returns the value of attribute max.



139
140
141
# File 'lib/puma/thread_pool.rb', line 139

def max
  @max
end

#minObject

Returns the value of attribute min.



139
140
141
# File 'lib/puma/thread_pool.rb', line 139

def min
  @min
end

#out_of_band_runningObject (readonly)

seconds



78
79
80
# File 'lib/puma/thread_pool.rb', line 78

def out_of_band_running
  @out_of_band_running
end

#pool_capacityObject (readonly)



174
175
176
# File 'lib/puma/thread_pool.rb', line 174

def pool_capacity
  (waiting + (@max - spawned)).clamp(0, Float::INFINITY)
end

#spawnedObject (readonly)

Returns the value of attribute spawned.



138
139
140
# File 'lib/puma/thread_pool.rb', line 138

def spawned
  @spawned
end

#trim_requestedObject (readonly)

Returns the value of attribute trim_requested.



138
139
140
# File 'lib/puma/thread_pool.rb', line 138

def trim_requested
  @trim_requested
end

#waitingObject (readonly)

Returns the value of attribute waiting.



138
139
140
# File 'lib/puma/thread_pool.rb', line 138

def waiting
  @waiting
end

Instance Method Details

#<<(work) ⇒ Object

Add work to the todo list for a Thread to pickup and process.



335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/puma/thread_pool.rb', line 335

def <<(work)
  with_mutex do
    if @shutdown
      raise "Unable to add work while shutting down"
    end

    @todo << work
    t = @todo.size
    @backlog_max = t if t > @backlog_max

    if @waiting < @todo.size and can_spawn_processor?
      spawn_thread
    end

    @not_empty.signal
  end
  self
end

#auto_reap!(timeout = @reaping_time) ⇒ Object



421
422
423
424
# File 'lib/puma/thread_pool.rb', line 421

def auto_reap!(timeout=@reaping_time)
  @reaper = Automaton.new(self, timeout, "#{@name} tp reap", :reap)
  @reaper.start!
end

#auto_trim!(timeout = @auto_trim_time) ⇒ Object



416
417
418
419
# File 'lib/puma/thread_pool.rb', line 416

def auto_trim!(timeout=@auto_trim_time)
  @auto_trim = Automaton.new(self, timeout, "#{@name} tp trim", :trim)
  @auto_trim.start!
end

#backlogObject

How many objects have yet to be processed by the pool?



163
164
165
# File 'lib/puma/thread_pool.rb', line 163

def backlog
  with_mutex { @todo.size }
end

#backlog_maxObject

The maximum size of the backlog



169
170
171
# File 'lib/puma/thread_pool.rb', line 169

def backlog_max
  with_mutex { @backlog_max }
end

#can_spawn_processor?Boolean

:nodoc:

Must be called with @mutex held!

Returns:

  • (Boolean)


328
329
330
331
332
# File 'lib/puma/thread_pool.rb', line 328

def can_spawn_processor?
  io_processors_count = @processors.count(&:marked_as_io_thread?)
  extra_io_processors_count = io_processors_count > @max_io_threads ? io_processors_count - @max_io_threads : 0
  (@spawned - io_processors_count) < (@max - extra_io_processors_count)
end

#reapObject

If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.



378
379
380
381
382
383
384
385
386
387
# File 'lib/puma/thread_pool.rb', line 378

def reap
  with_mutex do
    @processors, dead_processors = @processors.partition(&:alive?)

    dead_processors.each do |processor|
      processor.kill
      @spawned -= 1
    end
  end
end

#reset_maxObject



157
158
159
# File 'lib/puma/thread_pool.rb', line 157

def reset_max
  with_mutex { @backlog_max = 0 }
end

#shutdown(timeout) ⇒ Object

Tell all threads in the pool to exit and wait for them to finish. Wait timeout seconds then raise ForceShutdown in remaining threads. Next, wait an extra @shutdown_grace_time seconds then force-kill remaining threads. Finally, wait 1 second for remaining threads to exit.



444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
# File 'lib/puma/thread_pool.rb', line 444

def shutdown(timeout)
  threads = with_mutex do
    @shutdown = true
    @trim_requested = @spawned
    @not_empty.broadcast
    @not_full.broadcast

    @auto_trim&.stop
    @reaper&.stop
    # dup processors so that we join them all safely
    @processors.dup
  end

  if @shutdown_debug == true
    shutdown_debug("Shutdown initiated")
  end

  if timeout == -1
    # Wait for threads to finish without force shutdown.
    threads.each(&:join)
  else
    join = ->(inner_timeout) do
      start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      threads.reject! do |t|
        remaining = inner_timeout - (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start)
        remaining > 0 && t.join(remaining)
      end
    end

    # Wait +timeout+ seconds for threads to finish.
    join.call(timeout)
    if @shutdown_debug == :on_force && !threads.empty?
      shutdown_debug("Shutdown timeout exceeded")
    end

    # If threads are still running, raise ForceShutdown and wait to finish.
    @shutdown_mutex.synchronize do
      @force_shutdown = true
      threads.each do |t|
        t.raise ForceShutdown if t[:with_force_shutdown]
      end
    end
    join.call(@shutdown_grace_time)
    if @shutdown_debug == :on_force && !threads.empty?
      shutdown_debug("Shutdown grace timeout exceeded")
    end

    # If threads are _still_ running, forcefully kill them and wait to finish.
    threads.each(&:kill)
    join.call(1)
  end

  @spawned = 0
  @processors = []
end

#spawn_thread_if_neededObject

:nodoc:



354
355
356
357
358
359
360
# File 'lib/puma/thread_pool.rb', line 354

def spawn_thread_if_needed # :nodoc:
  with_mutex do
    if @waiting < @todo.size and can_spawn_processor?
      spawn_thread
    end
  end
end

#statsHash

generate stats hash so as not to perform multiple locks

Returns:

  • (Hash)

    hash containing stat info from ThreadPool



143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/puma/thread_pool.rb', line 143

def stats
  with_mutex do
    temp = @backlog_max
    @backlog_max = 0
    { backlog: @todo.size,
      running: @spawned,
      pool_capacity: pool_capacity,
      busy_threads: @spawned - @waiting + @todo.size,
      io_threads: @processors.count(&:marked_as_io_thread?),
      backlog_max: temp
    }
  end
end

#trim(force = false) ⇒ Object

If there are any free threads in the pool, tell one to go ahead and exit. If force is true, then a trim request is requested even if all threads are being utilized.



366
367
368
369
370
371
372
373
374
# File 'lib/puma/thread_pool.rb', line 366

def trim(force=false)
  with_mutex do
    free = @waiting - @todo.size
    if (force or free > 0) and @spawned - @trim_requested > @min
      @trim_requested += 1
      @not_empty.signal
    end
  end
end

#wait_while_out_of_band_runningObject



309
310
311
312
313
314
315
# File 'lib/puma/thread_pool.rb', line 309

def wait_while_out_of_band_running
  return unless @out_of_band_running

  with_mutex do
    @out_of_band_condvar.wait(@mutex) while @out_of_band_running
  end
end

#with_force_shutdownObject

Allows ThreadPool::ForceShutdown to be raised within the provided block if the thread is forced to shutdown during execution.



428
429
430
431
432
433
434
435
436
437
# File 'lib/puma/thread_pool.rb', line 428

def with_force_shutdown
  t = Thread.current
  @shutdown_mutex.synchronize do
    raise ForceShutdown if @force_shutdown
    t[:with_force_shutdown] = true
  end
  yield
ensure
  t[:with_force_shutdown] = false
end

#with_mutex(&block) ⇒ Object

Version:

  • 5.0.0



318
319
320
321
322
# File 'lib/puma/thread_pool.rb', line 318

def with_mutex(&block)
  @mutex.owned? ?
    yield :
    @mutex.synchronize(&block)
end