Class: Protobuf::Nats::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/protobuf/nats/thread_pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(size, opts = {}) ⇒ ThreadPool

Returns a new instance of ThreadPool.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/protobuf/nats/thread_pool.rb', line 7

def initialize(size, opts = {})
  @queue = ::Queue.new
  # Lock-free counter of in-flight work. Replaces a mutex-guarded integer so
  # that N workers running in true parallel (JRuby) don't serialize on every
  # task completion.
  @active_work = ::Concurrent::AtomicFixnum.new(0)

  # Callbacks
  @error_cb = lambda do |error|
    logger.error("Error in ThreadPool worker: #{error.message}
 #{error.backtrace.join("
")}")
  end

  # Synchronization
  @mutex = ::Mutex.new      # guards the @workers array only
  @cb_mutex = ::Mutex.new

  # Let's get this party started
  queue_size = opts[:max_queue].to_i || 0
  @max_size = size + queue_size
  @max_workers = size
  @shutting_down = ::Concurrent::AtomicBoolean.new(false)
  @workers = []
  supervise_workers
end

Instance Method Details

#enqueued_sizeObject



34
35
36
# File 'lib/protobuf/nats/thread_pool.rb', line 34

def enqueued_size
  @queue.size
end

#full?Boolean

Thread-safe access to check if the pool is full.

Returns:

  • (Boolean)


39
40
41
# File 'lib/protobuf/nats/thread_pool.rb', line 39

def full?
  @active_work.value >= @max_size
end

#killObject



74
75
76
77
# File 'lib/protobuf/nats/thread_pool.rb', line 74

def kill
  @shutting_down.make_true
  @workers.map(&:kill)
end

#max_sizeObject



43
44
45
# File 'lib/protobuf/nats/thread_pool.rb', line 43

def max_size
  @max_size
end

#on_error(&cb) ⇒ Object

This callback is executed in a thread safe manner.



90
91
92
# File 'lib/protobuf/nats/thread_pool.rb', line 90

def on_error(&cb)
  @cb_mutex.synchronize { @error_cb = cb }
end

#push(&work_cb) ⇒ Object

This method is now thread-safe.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/protobuf/nats/thread_pool.rb', line 48

def push(&work_cb)
  return false if @shutting_down.true?

  # Optimistically claim a slot; back off if we exceeded the cap. This admits
  # work only while active_work < max_size, matching the original guard, but
  # without holding a mutex across the enqueue.
  if @active_work.increment > @max_size
    @active_work.decrement
    return false
  end

  @queue << [:work, work_cb]

  # Supervise outside any lock-held section to avoid holding it during thread creation.
  supervise_workers
  true
end

#shutdownObject

This method is now thread-safe.



67
68
69
70
71
72
# File 'lib/protobuf/nats/thread_pool.rb', line 67

def shutdown
  # CAS ensures the poison pills are pushed exactly once.
  return unless @shutting_down.make_true

  @max_workers.times { @queue << [:stop, nil] }
end

#sizeObject

Thread-safe access to the current active work size.



95
96
97
# File 'lib/protobuf/nats/thread_pool.rb', line 95

def size
  @active_work.value
end

#wait_for_termination(seconds = nil) ⇒ Object



79
80
81
82
83
84
85
86
87
# File 'lib/protobuf/nats/thread_pool.rb', line 79

def wait_for_termination(seconds = nil)
  started_at = ::Time.now
  loop do
    sleep 0.1
    break if seconds && (::Time.now - started_at) >= seconds
    break if @workers.empty?
    prune_dead_workers
  end
end