Class: ActiveJob::Temporal::WorkerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/activejob/temporal/worker_pool.rb

Overview

rubocop:disable Metrics/ClassLength

Defined Under Namespace

Classes: Child, ProcessAdapter

Constant Summary collapse

DEFAULT_RESTART_DELAY =
1.0
DEFAULT_SHUTDOWN_TIMEOUT =
10.0
MAX_RESTART_COUNT =
1_000
SHUTDOWN_SIGNALS =
%w[INT TERM].freeze
VALID_OPTIONS =
%i[
  worker_command
  process_adapter
  health_check_port
  health_check_bind
  health_check_allow_public_bind
  metrics_port
  metrics_bind
  metrics_allow_public_bind
  max_concurrent_activities
  max_concurrent_workflows
  restart_delay
  shutdown_timeout
  install_signal_handlers
].freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size:, **options) ⇒ WorkerPool

Returns a new instance of WorkerPool.



33
34
35
36
37
38
39
40
41
# File 'lib/activejob/temporal/worker_pool.rb', line 33

def initialize(size:, **options)
  validate_options!(options)

  @size = positive_integer(size, "pool size")
  configure_process_options(options)
  configure_runtime_options(options)
  initialize_state
  validate!
end

Class Method Details

.default_worker_commandObject



43
44
45
46
47
48
# File 'lib/activejob/temporal/worker_pool.rb', line 43

def self.default_worker_command
  bundled_executable = File.expand_path("../../../bin/temporal-worker", __dir__)
  return [RbConfig.ruby, bundled_executable] if File.exist?(bundled_executable)

  ["temporal-worker"]
end

Instance Method Details

#running?Boolean

Returns:

  • (Boolean)


92
93
94
# File 'lib/activejob/temporal/worker_pool.rb', line 92

def running?
  @mutex.synchronize { @running }
end

#start(supervise: true) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/activejob/temporal/worker_pool.rb', line 50

def start(supervise: true)
  ensure_fork_supported!

  @mutex.synchronize do
    return self if @running

    @running = true
    @stopping = false
  end

  install_signal_handlers if @install_signal_handlers
  @size.times { |index| start_worker(index) }
  @supervisor_thread = Thread.new { supervise_workers } if supervise
  self
end

#stopObject



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/activejob/temporal/worker_pool.rb', line 71

def stop
  children = @mutex.synchronize do
    if @stopping
      nil
    else
      @stopping = true
      @running = false

      @children.values
    end
  end
  return self unless children

  children.each { |child| terminate_worker(child) }
  deadline = monotonic_time + @shutdown_timeout
  children.each { |child| wait_for_worker(child, deadline) }
  @mutex.synchronize { children.each { |child| @children.delete(child.pid) } }
  restore_signal_handlers if @install_signal_handlers
  self
end

#waitObject



66
67
68
69
# File 'lib/activejob/temporal/worker_pool.rb', line 66

def wait
  @supervisor_thread&.join
  self
end