Class: Protobuf::Nats::ThreadPool
- Inherits:
-
Object
- Object
- Protobuf::Nats::ThreadPool
- Defined in:
- lib/protobuf/nats/thread_pool.rb
Instance Method Summary collapse
- #enqueued_size ⇒ Object
-
#full? ⇒ Boolean
Thread-safe access to check if the pool is full.
-
#initialize(size, opts = {}) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #kill ⇒ Object
- #max_size ⇒ Object
-
#on_error(&cb) ⇒ Object
This callback is executed in a thread safe manner.
-
#push(&work_cb) ⇒ Object
This method is now thread-safe.
-
#replenish ⇒ Object
Top the pool back up to max_workers if workers have died (e.g. one was killed by a non-StandardError, which the per-task rescue can’t catch).
-
#shutdown ⇒ Object
This method is now thread-safe.
-
#size ⇒ Object
Thread-safe access to the current active work size.
-
#wait_for_termination(seconds = nil) ⇒ Object
Wait until all workers exit.
Constructor Details
#initialize(size, opts = {}) ⇒ ThreadPool
Returns a new instance of ThreadPool.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/protobuf/nats/thread_pool.rb', line 7 def initialize(size, opts = {}) @queue = ::Queue.new # Lock-free counter of in-flight work. Replaces a mutex-guarded integer so # that N workers running in true parallel (JRuby) don't serialize on every # task completion. @active_work = ::Concurrent::AtomicFixnum.new(0) # Callbacks @error_cb = lambda do |error| logger.error("Error in ThreadPool worker: #{error.} #{error.backtrace.join(" ")}") end # Synchronization @mutex = ::Mutex.new # guards the @workers array only @cb_mutex = ::Mutex.new # Let's get this party started queue_size = opts[:max_queue].to_i || 0 @max_size = size + queue_size @max_workers = size @shutting_down = ::Concurrent::AtomicBoolean.new(false) @workers = [] supervise_workers end |
Instance Method Details
#enqueued_size ⇒ Object
34 35 36 |
# File 'lib/protobuf/nats/thread_pool.rb', line 34 def enqueued_size @queue.size end |
#full? ⇒ Boolean
Thread-safe access to check if the pool is full.
39 40 41 |
# File 'lib/protobuf/nats/thread_pool.rb', line 39 def full? @active_work.value >= @max_size end |
#kill ⇒ Object
74 75 76 77 |
# File 'lib/protobuf/nats/thread_pool.rb', line 74 def kill @shutting_down.make_true @workers.map(&:kill) end |
#max_size ⇒ Object
43 44 45 |
# File 'lib/protobuf/nats/thread_pool.rb', line 43 def max_size @max_size end |
#on_error(&cb) ⇒ Object
This callback is executed in a thread safe manner.
100 101 102 |
# File 'lib/protobuf/nats/thread_pool.rb', line 100 def on_error(&cb) @cb_mutex.synchronize { @error_cb = cb } end |
#push(&work_cb) ⇒ Object
This method is now thread-safe.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/protobuf/nats/thread_pool.rb', line 48 def push(&work_cb) return false if @shutting_down.true? # Optimistically claim a slot; back off if we exceeded the cap. This admits # work only while active_work < max_size, matching the original guard, but # without holding a mutex across the enqueue. if @active_work.increment > @max_size @active_work.decrement return false end @queue << [:work, work_cb] # Supervise outside any lock-held section to avoid holding it during thread creation. supervise_workers true end |
#replenish ⇒ Object
Top the pool back up to max_workers if workers have died (e.g. one was killed by a non-StandardError, which the per-task rescue can’t catch). No-op while shutting down so we don’t resurrect workers mid-drain.
94 95 96 97 |
# File 'lib/protobuf/nats/thread_pool.rb', line 94 def replenish return if @shutting_down.true? supervise_workers end |
#shutdown ⇒ Object
This method is now thread-safe.
67 68 69 70 71 72 |
# File 'lib/protobuf/nats/thread_pool.rb', line 67 def shutdown # CAS ensures the poison pills are pushed exactly once. return unless @shutting_down.make_true @max_workers.times { @queue << [:stop, nil] } end |
#size ⇒ Object
Thread-safe access to the current active work size.
105 106 107 |
# File 'lib/protobuf/nats/thread_pool.rb', line 105 def size @active_work.value end |
#wait_for_termination(seconds = nil) ⇒ Object
Wait until all workers exit. Returns true if the pool drained, false if the timeout elapsed first. Prunes under the mutex (it mutates @workers).
81 82 83 84 85 86 87 88 89 |
# File 'lib/protobuf/nats/thread_pool.rb', line 81 def wait_for_termination(seconds = nil) deadline = seconds && (::Protobuf::Nats.monotonic_time + seconds) loop do @mutex.synchronize { prune_dead_workers } return true if @workers.empty? return false if deadline && ::Protobuf::Nats.monotonic_time >= deadline sleep 0.1 end end |