Class: ActiveJob::Temporal::WorkerPool
- Inherits:
-
Object
- Object
- ActiveJob::Temporal::WorkerPool
show all
- Defined in:
- lib/activejob/temporal/worker_pool.rb
Overview
rubocop:disable Metrics/ClassLength
Defined Under Namespace
Classes: Child, ProcessAdapter
Constant Summary
collapse
- DEFAULT_RESTART_DELAY =
1.0
- DEFAULT_SHUTDOWN_TIMEOUT =
10.0
- MAX_RESTART_COUNT =
1_000
- SHUTDOWN_SIGNALS =
%w[INT TERM].freeze
- VALID_OPTIONS =
%i[
worker_command
process_adapter
health_check_port
health_check_bind
health_check_allow_public_bind
metrics_port
metrics_bind
metrics_allow_public_bind
max_concurrent_activities
max_concurrent_workflows
restart_delay
shutdown_timeout
install_signal_handlers
].freeze
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(size:, **options) ⇒ WorkerPool
Returns a new instance of WorkerPool.
33
34
35
36
37
38
39
40
41
|
# File 'lib/activejob/temporal/worker_pool.rb', line 33
def initialize(size:, **options)
validate_options!(options)
@size = positive_integer(size, "pool size")
configure_process_options(options)
configure_runtime_options(options)
initialize_state
validate!
end
|
Class Method Details
.default_worker_command ⇒ Object
43
44
45
46
47
48
|
# File 'lib/activejob/temporal/worker_pool.rb', line 43
def self.default_worker_command
bundled_executable = File.expand_path("../../../bin/temporal-worker", __dir__)
return [RbConfig.ruby, bundled_executable] if File.exist?(bundled_executable)
["temporal-worker"]
end
|
Instance Method Details
#running? ⇒ Boolean
92
93
94
|
# File 'lib/activejob/temporal/worker_pool.rb', line 92
def running?
@mutex.synchronize { @running }
end
|
#start(supervise: true) ⇒ Object
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/activejob/temporal/worker_pool.rb', line 50
def start(supervise: true)
ensure_fork_supported!
@mutex.synchronize do
return self if @running
@running = true
@stopping = false
end
install_signal_handlers if @install_signal_handlers
@size.times { |index| start_worker(index) }
@supervisor_thread = Thread.new { supervise_workers } if supervise
self
end
|
#stop ⇒ Object
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/activejob/temporal/worker_pool.rb', line 71
def stop
children = @mutex.synchronize do
if @stopping
nil
else
@stopping = true
@running = false
@children.values
end
end
return self unless children
children.each { |child| terminate_worker(child) }
deadline = monotonic_time + @shutdown_timeout
children.each { |child| wait_for_worker(child, deadline) }
@mutex.synchronize { children.each { |child| @children.delete(child.pid) } }
restore_signal_handlers if @install_signal_handlers
self
end
|
#wait ⇒ Object
66
67
68
69
|
# File 'lib/activejob/temporal/worker_pool.rb', line 66
def wait
@supervisor_thread&.join
self
end
|