Class: Protobuf::Nats::ThreadPool
- Inherits:
-
Object
- Object
- Protobuf::Nats::ThreadPool
- Defined in:
- lib/protobuf/nats/thread_pool.rb
Instance Method Summary collapse
- #enqueued_size ⇒ Object
-
#full? ⇒ Boolean
Thread-safe access to check if the pool is full.
-
#initialize(size, opts = {}) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #kill ⇒ Object
- #max_size ⇒ Object
-
#on_error(&cb) ⇒ Object
This callback is executed in a thread safe manner.
-
#push(&work_cb) ⇒ Object
This method is now thread-safe.
-
#shutdown ⇒ Object
This method is now thread-safe.
-
#size ⇒ Object
Thread-safe access to the current active work size.
- #wait_for_termination(seconds = nil) ⇒ Object
Constructor Details
#initialize(size, opts = {}) ⇒ ThreadPool
Returns a new instance of ThreadPool.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/protobuf/nats/thread_pool.rb', line 7 def initialize(size, opts = {}) @queue = ::Queue.new # Lock-free counter of in-flight work. Replaces a mutex-guarded integer so # that N workers running in true parallel (JRuby) don't serialize on every # task completion. @active_work = ::Concurrent::AtomicFixnum.new(0) # Callbacks @error_cb = lambda do |error| logger.error("Error in ThreadPool worker: #{error.} #{error.backtrace.join(" ")}") end # Synchronization @mutex = ::Mutex.new # guards the @workers array only @cb_mutex = ::Mutex.new # Let's get this party started queue_size = opts[:max_queue].to_i || 0 @max_size = size + queue_size @max_workers = size @shutting_down = ::Concurrent::AtomicBoolean.new(false) @workers = [] supervise_workers end |
Instance Method Details
#enqueued_size ⇒ Object
34 35 36 |
# File 'lib/protobuf/nats/thread_pool.rb', line 34 def enqueued_size @queue.size end |
#full? ⇒ Boolean
Thread-safe access to check if the pool is full.
39 40 41 |
# File 'lib/protobuf/nats/thread_pool.rb', line 39 def full? @active_work.value >= @max_size end |
#kill ⇒ Object
74 75 76 77 |
# File 'lib/protobuf/nats/thread_pool.rb', line 74 def kill @shutting_down.make_true @workers.map(&:kill) end |
#max_size ⇒ Object
43 44 45 |
# File 'lib/protobuf/nats/thread_pool.rb', line 43 def max_size @max_size end |
#on_error(&cb) ⇒ Object
This callback is executed in a thread safe manner.
90 91 92 |
# File 'lib/protobuf/nats/thread_pool.rb', line 90 def on_error(&cb) @cb_mutex.synchronize { @error_cb = cb } end |
#push(&work_cb) ⇒ Object
This method is now thread-safe.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/protobuf/nats/thread_pool.rb', line 48 def push(&work_cb) return false if @shutting_down.true? # Optimistically claim a slot; back off if we exceeded the cap. This admits # work only while active_work < max_size, matching the original guard, but # without holding a mutex across the enqueue. if @active_work.increment > @max_size @active_work.decrement return false end @queue << [:work, work_cb] # Supervise outside any lock-held section to avoid holding it during thread creation. supervise_workers true end |
#shutdown ⇒ Object
This method is now thread-safe.
67 68 69 70 71 72 |
# File 'lib/protobuf/nats/thread_pool.rb', line 67 def shutdown # CAS ensures the poison pills are pushed exactly once. return unless @shutting_down.make_true @max_workers.times { @queue << [:stop, nil] } end |
#size ⇒ Object
Thread-safe access to the current active work size.
95 96 97 |
# File 'lib/protobuf/nats/thread_pool.rb', line 95 def size @active_work.value end |
#wait_for_termination(seconds = nil) ⇒ Object
79 80 81 82 83 84 85 86 87 |
# File 'lib/protobuf/nats/thread_pool.rb', line 79 def wait_for_termination(seconds = nil) started_at = ::Time.now loop do sleep 0.1 break if seconds && (::Time.now - started_at) >= seconds break if @workers.empty? prune_dead_workers end end |