Class: Protobuf::Nats::ThreadPool
- Inherits:
-
Object
- Object
- Protobuf::Nats::ThreadPool
- Defined in:
- lib/protobuf/nats/thread_pool.rb
Instance Method Summary collapse
- #enqueued_size ⇒ Object
-
#full? ⇒ Boolean
Thread-safe access to check if the pool is full.
-
#initialize(size, opts = {}) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #kill ⇒ Object
- #max_size ⇒ Object
-
#on_error(&cb) ⇒ Object
This callback is executed in a thread safe manner.
-
#push(&work_cb) ⇒ Object
This method is now thread-safe.
-
#shutdown ⇒ Object
This method is now thread-safe.
-
#size ⇒ Object
Thread-safe access to the current active work size.
- #wait_for_termination(seconds = nil) ⇒ Object
Constructor Details
#initialize(size, opts = {}) ⇒ ThreadPool
Returns a new instance of ThreadPool.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/protobuf/nats/thread_pool.rb', line 5 def initialize(size, opts = {}) @queue = ::Queue.new @active_work = 0 # Callbacks @error_cb = lambda do |error| logger.error("Error in ThreadPool worker: #{error.} #{error.backtrace.join(" ")}") end # Synchronization @mutex = ::Mutex.new @cb_mutex = ::Mutex.new # Let's get this party started queue_size = opts[:max_queue].to_i || 0 @max_size = size + queue_size @max_workers = size @shutting_down = false @workers = [] supervise_workers end |
Instance Method Details
#enqueued_size ⇒ Object
29 30 31 |
# File 'lib/protobuf/nats/thread_pool.rb', line 29 def enqueued_size @queue.size end |
#full? ⇒ Boolean
Thread-safe access to check if the pool is full.
34 35 36 |
# File 'lib/protobuf/nats/thread_pool.rb', line 34 def full? @mutex.synchronize { @active_work >= @max_size } end |
#kill ⇒ Object
69 70 71 72 |
# File 'lib/protobuf/nats/thread_pool.rb', line 69 def kill @shutting_down = true @workers.map(&:kill) end |
#max_size ⇒ Object
38 39 40 |
# File 'lib/protobuf/nats/thread_pool.rb', line 38 def max_size @max_size end |
#on_error(&cb) ⇒ Object
This callback is executed in a thread safe manner.
85 86 87 |
# File 'lib/protobuf/nats/thread_pool.rb', line 85 def on_error(&cb) @cb_mutex.synchronize { @error_cb = cb } end |
#push(&work_cb) ⇒ Object
This method is now thread-safe.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/protobuf/nats/thread_pool.rb', line 43 def push(&work_cb) @mutex.synchronize do # Re-check conditions inside the lock to guarantee safety. return false if @active_work >= @max_size return false if @shutting_down @queue << [:work, work_cb] @active_work += 1 end # Supervise outside the lock to avoid holding it during thread creation. supervise_workers true end |
#shutdown ⇒ Object
This method is now thread-safe.
59 60 61 62 63 64 65 66 67 |
# File 'lib/protobuf/nats/thread_pool.rb', line 59 def shutdown @mutex.synchronize do return if @shutting_down # Prevent sending stop messages multiple times @shutting_down = true end # Pushing poison pills can happen outside the lock. @max_workers.times { @queue << [:stop, nil] } end |
#size ⇒ Object
Thread-safe access to the current active work size.
90 91 92 |
# File 'lib/protobuf/nats/thread_pool.rb', line 90 def size @mutex.synchronize { @active_work } end |
#wait_for_termination(seconds = nil) ⇒ Object
74 75 76 77 78 79 80 81 82 |
# File 'lib/protobuf/nats/thread_pool.rb', line 74 def wait_for_termination(seconds = nil) started_at = ::Time.now loop do sleep 0.1 break if seconds && (::Time.now - started_at) >= seconds break if @workers.empty? prune_dead_workers end end |