Class: ActiveJob::Temporal::ConnectionWorkerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/activejob/temporal/connection_worker_pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(size:, queue_size:, name:, &handler) ⇒ ConnectionWorkerPool

Returns a new instance of ConnectionWorkerPool.

Raises:

  • (ArgumentError)


6
7
8
9
10
11
12
13
14
15
16
17
# File 'lib/activejob/temporal/connection_worker_pool.rb', line 6

def initialize(size:, queue_size:, name:, &handler)
  @size = Integer(size)
  @queue_size = Integer(queue_size)
  @name = name
  @handler = handler
  @mutex = Mutex.new
  @started = false

  raise ArgumentError, "size must be positive" unless @size.positive?
  raise ArgumentError, "queue_size must be positive" unless @queue_size.positive?
  raise ArgumentError, "handler is required" unless @handler
end

Instance Method Details

#enqueue(connection) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/activejob/temporal/connection_worker_pool.rb', line 34

def enqueue(connection)
  queue = @mutex.synchronize { @queue if @started }
  unless queue
    close_connection(connection)
    return false
  end

  queue.push(connection, true)
  true
rescue ClosedQueueError, ThreadError
  close_connection(connection)
  false
end

#startObject



19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/activejob/temporal/connection_worker_pool.rb', line 19

def start
  @mutex.synchronize do
    return self if @started

    @queue = SizedQueue.new(@queue_size)
    queue = @queue
    @workers = Array.new(@size) do |index|
      Thread.new(queue, index) { |worker_queue, worker_index| run_worker(worker_queue, worker_index) }
    end
    @started = true
  end

  self
end

#stop(timeout:) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/activejob/temporal/connection_worker_pool.rb', line 48

def stop(timeout:)
  queue = nil
  workers = nil

  @mutex.synchronize do
    return unless @started

    queue = @queue
    workers = @workers
    @queue = nil
    @workers = []
    @started = false
  end

  queue.close
  workers.each { |worker| worker.join(timeout) }
end