Class: Puma::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/puma/thread_pool.rb

Overview

Internal Docs for A simple thread pool management object.

Each Puma “worker” has a thread pool to process requests.

First a connection to a client is made in `Puma::Server`. It is wrapped in a `Puma::Client` instance and then passed to the `Puma::Reactor` to ensure the whole request is buffered into memory. Once the request is ready, it is passed into a thread pool via the `Puma::ThreadPool#<<` operator where it is stored in a `@todo` array.

Each thread in the pool has an internal loop where it pulls a request from the `@todo` array and processes it.

Defined Under Namespace

Classes: Automaton, ForceShutdown

Constant Summary collapse

SHUTDOWN_GRACE_TIME =

How long, after raising the ForceShutdown of a thread during forced shutdown mode, to wait for the thread to try and finish up its work before leaving the thread to die on the vine.

5

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, min, max, *extra, &block) ⇒ ThreadPool

Maintain a minimum of min and maximum of max threads in the pool.

The block passed is the work that will be performed in each thread.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/puma/thread_pool.rb', line 32

def initialize(name, min, max, *extra, &block)
  @not_empty = ConditionVariable.new
  @not_full = ConditionVariable.new
  @mutex = Mutex.new

  @todo = []

  @spawned = 0
  @waiting = 0

  @name = name
  @min = Integer(min)
  @max = Integer(max)
  @block = block
  @extra = extra

  @shutdown = false

  @trim_requested = 0
  @out_of_band_pending = false

  @workers = []

  @auto_trim = nil
  @reaper = nil

  @mutex.synchronize do
    @min.times do
      spawn_thread
      @not_full.wait(@mutex)
    end
  end

  @clean_thread_locals = false
  @force_shutdown = false
  @shutdown_mutex = Mutex.new
end

Instance Attribute Details

#busy_threadsObject (readonly)

Version:

  • 5.0.0



93
94
95
# File 'lib/puma/thread_pool.rb', line 93

def busy_threads
  with_mutex { @spawned - @waiting + @todo.size }
end

#clean_thread_localsObject

Returns the value of attribute clean_thread_locals.



71
72
73
# File 'lib/puma/thread_pool.rb', line 71

def clean_thread_locals
  @clean_thread_locals
end

#out_of_band_hookObject

Version:

  • 5.0.0



72
73
74
# File 'lib/puma/thread_pool.rb', line 72

def out_of_band_hook
  @out_of_band_hook
end

#pool_capacityObject (readonly)



87
88
89
# File 'lib/puma/thread_pool.rb', line 87

def pool_capacity
  waiting + (@max - spawned)
end

#spawnedObject (readonly)

Returns the value of attribute spawned.



70
71
72
# File 'lib/puma/thread_pool.rb', line 70

def spawned
  @spawned
end

#trim_requestedObject (readonly)

Returns the value of attribute trim_requested.



70
71
72
# File 'lib/puma/thread_pool.rb', line 70

def trim_requested
  @trim_requested
end

#waitingObject (readonly)

Returns the value of attribute waiting.



70
71
72
# File 'lib/puma/thread_pool.rb', line 70

def waiting
  @waiting
end

Class Method Details

.clean_thread_localsObject



74
75
76
77
78
# File 'lib/puma/thread_pool.rb', line 74

def self.clean_thread_locals
  Thread.current.keys.each do |key| # rubocop: disable Style/HashEachMethods
    Thread.current[key] = nil unless key == :__recursive_key__
  end
end

Instance Method Details

#<<(work) ⇒ Object

Add work to the todo list for a Thread to pickup and process.



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/puma/thread_pool.rb', line 185

def <<(work)
  with_mutex do
    if @shutdown
      raise "Unable to add work while shutting down"
    end

    @todo << work

    if @waiting < @todo.size and @spawned < @max
      spawn_thread
    end

    @not_empty.signal
  end
end

#auto_reap!(timeout = 5) ⇒ Object



327
328
329
330
# File 'lib/puma/thread_pool.rb', line 327

def auto_reap!(timeout=5)
  @reaper = Automaton.new(self, timeout, "#{@name} threadpool reaper", :reap)
  @reaper.start!
end

#auto_trim!(timeout = 30) ⇒ Object



322
323
324
325
# File 'lib/puma/thread_pool.rb', line 322

def auto_trim!(timeout=30)
  @auto_trim = Automaton.new(self, timeout, "#{@name} threadpool trimmer", :trim)
  @auto_trim.start!
end

#backlogObject

How many objects have yet to be processed by the pool?



82
83
84
# File 'lib/puma/thread_pool.rb', line 82

def backlog
  with_mutex { @todo.size }
end

#reapObject

If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.



280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/puma/thread_pool.rb', line 280

def reap
  with_mutex do
    dead_workers = @workers.reject(&:alive?)

    dead_workers.each do |worker|
      worker.kill
      @spawned -= 1
    end

    @workers.delete_if do |w|
      dead_workers.include?(w)
    end
  end
end

#shutdown(timeout = -1)) ⇒ Object

Tell all threads in the pool to exit and wait for them to finish. Wait timeout seconds then raise ForceShutdown in remaining threads. Next, wait an extra grace seconds then force-kill remaining threads. Finally, wait kill_grace seconds for remaining threads to exit.



350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# File 'lib/puma/thread_pool.rb', line 350

def shutdown(timeout=-1)
  threads = with_mutex do
    @shutdown = true
    @trim_requested = @spawned
    @not_empty.broadcast
    @not_full.broadcast

    @auto_trim.stop if @auto_trim
    @reaper.stop if @reaper
    # dup workers so that we join them all safely
    @workers.dup
  end

  if timeout == -1
    # Wait for threads to finish without force shutdown.
    threads.each(&:join)
  else
    join = ->(inner_timeout) do
      start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      threads.reject! do |t|
        elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
        t.join inner_timeout - elapsed
      end
    end

    # Wait +timeout+ seconds for threads to finish.
    join.call(timeout)

    # If threads are still running, raise ForceShutdown and wait to finish.
    @shutdown_mutex.synchronize do
      @force_shutdown = true
      threads.each do |t|
        t.raise ForceShutdown if t[:with_force_shutdown]
      end
    end
    join.call(SHUTDOWN_GRACE_TIME)

    # If threads are _still_ running, forcefully kill them and wait to finish.
    threads.each(&:kill)
    join.call(1)
  end

  @spawned = 0
  @workers = []
end

#trim(force = false) ⇒ Object

If there are any free threads in the pool, tell one to go ahead and exit. If force is true, then a trim request is requested even if all threads are being utilized.



268
269
270
271
272
273
274
275
276
# File 'lib/puma/thread_pool.rb', line 268

def trim(force=false)
  with_mutex do
    free = @waiting - @todo.size
    if (force or free > 0) and @spawned - @trim_requested > @min
      @trim_requested += 1
      @not_empty.signal
    end
  end
end

#wait_for_less_busy_worker(delay_s) ⇒ Object

Version:

  • 5.0.0



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/puma/thread_pool.rb', line 244

def wait_for_less_busy_worker(delay_s)
  return unless delay_s && delay_s > 0

  # Ruby MRI does GVL, this can result
  # in processing contention when multiple threads
  # (requests) are running concurrently
  return unless Puma.mri?

  with_mutex do
    return if @shutdown

    # do not delay, if we are not busy
    return unless busy_threads > 0

    # this will be signaled once a request finishes,
    # which can happen earlier than delay
    @not_full.wait @mutex, delay_s
  end
end

#wait_until_not_fullObject

This method is used by `Puma::Server` to let the server know when the thread pool can pull more requests from the socket and pass to the reactor.

The general idea is that the thread pool can only work on a fixed number of requests at the same time. If it is already processing that number of requests then it is at capacity. If another Puma process has spare capacity, then the request can be left on the socket so the other worker can pick it up and process it.

For example: if there are 5 threads, but only 4 working on requests, this method will not wait and the `Puma::Server` can pull a request right away.

If there are 5 threads and all 5 of them are busy, then it will pause here, and wait until the `not_full` condition variable is signaled, usually this indicates that a request has been processed.

It's important to note that even though the server might accept another request, it might not be added to the `@todo` array right away. For example if a slow client has only sent a header, but not a body then the `@todo` array would stay the same size as the reactor works to try to buffer the request. In that scenario the next call to this method would not block and another request would be added into the reactor by the server. This would continue until a fully buffered request makes it through the reactor and can then be processed by the thread pool.



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/puma/thread_pool.rb', line 227

def wait_until_not_full
  with_mutex do
    while true
      return if @shutdown

      # If we can still spin up new threads and there
      # is work queued that cannot be handled by waiting
      # threads, then accept more work until we would
      # spin up the max number of threads.
      return if busy_threads < @max

      @not_full.wait @mutex
    end
  end
end

#with_force_shutdownObject

Allows ThreadPool::ForceShutdown to be raised within the provided block if the thread is forced to shutdown during execution.



334
335
336
337
338
339
340
341
342
343
# File 'lib/puma/thread_pool.rb', line 334

def with_force_shutdown
  t = Thread.current
  @shutdown_mutex.synchronize do
    raise ForceShutdown if @force_shutdown
    t[:with_force_shutdown] = true
  end
  yield
ensure
  t[:with_force_shutdown] = false
end

#with_mutex(&block) ⇒ Object

Version:

  • 5.0.0



178
179
180
181
182
# File 'lib/puma/thread_pool.rb', line 178

def with_mutex(&block)
  @mutex.owned? ?
    yield :
    @mutex.synchronize(&block)
end