Class: Protobuf::Nats::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/protobuf/nats/thread_pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(size, opts = {}) ⇒ ThreadPool

Returns a new instance of ThreadPool.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/protobuf/nats/thread_pool.rb', line 5

def initialize(size, opts = {})
  @queue = ::Queue.new
  @active_work = 0

  # Callbacks
  @error_cb = lambda do |error|
    logger.error("Error in ThreadPool worker: #{error.message}
 #{error.backtrace.join("
")}")
  end

  # Synchronization
  @mutex = ::Mutex.new
  @cb_mutex = ::Mutex.new

  # Let's get this party started
  queue_size = opts[:max_queue].to_i || 0
  @max_size = size + queue_size
  @max_workers = size
  @shutting_down = false
  @workers = []
  supervise_workers
end

Instance Method Details

#enqueued_sizeObject



29
30
31
# File 'lib/protobuf/nats/thread_pool.rb', line 29

def enqueued_size
  @queue.size
end

#full?Boolean

Thread-safe access to check if the pool is full.

Returns:

  • (Boolean)


34
35
36
# File 'lib/protobuf/nats/thread_pool.rb', line 34

def full?
  @mutex.synchronize { @active_work >= @max_size }
end

#killObject



69
70
71
72
# File 'lib/protobuf/nats/thread_pool.rb', line 69

def kill
  @shutting_down = true
  @workers.map(&:kill)
end

#max_sizeObject



38
39
40
# File 'lib/protobuf/nats/thread_pool.rb', line 38

def max_size
  @max_size
end

#on_error(&cb) ⇒ Object

This callback is executed in a thread safe manner.



85
86
87
# File 'lib/protobuf/nats/thread_pool.rb', line 85

def on_error(&cb)
  @cb_mutex.synchronize { @error_cb = cb }
end

#push(&work_cb) ⇒ Object

This method is now thread-safe.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/protobuf/nats/thread_pool.rb', line 43

def push(&work_cb)
  @mutex.synchronize do
    # Re-check conditions inside the lock to guarantee safety.
    return false if @active_work >= @max_size
    return false if @shutting_down

    @queue << [:work, work_cb]
    @active_work += 1
  end

  # Supervise outside the lock to avoid holding it during thread creation.
  supervise_workers
  true
end

#shutdownObject

This method is now thread-safe.



59
60
61
62
63
64
65
66
67
# File 'lib/protobuf/nats/thread_pool.rb', line 59

def shutdown
  @mutex.synchronize do
    return if @shutting_down # Prevent sending stop messages multiple times
    @shutting_down = true
  end

  # Pushing poison pills can happen outside the lock.
  @max_workers.times { @queue << [:stop, nil] }
end

#sizeObject

Thread-safe access to the current active work size.



90
91
92
# File 'lib/protobuf/nats/thread_pool.rb', line 90

def size
  @mutex.synchronize { @active_work }
end

#wait_for_termination(seconds = nil) ⇒ Object



74
75
76
77
78
79
80
81
82
# File 'lib/protobuf/nats/thread_pool.rb', line 74

def wait_for_termination(seconds = nil)
  started_at = ::Time.now
  loop do
    sleep 0.1
    break if seconds && (::Time.now - started_at) >= seconds
    break if @workers.empty?
    prune_dead_workers
  end
end