Class: Workhorse::Worker
- Inherits:
-
Object
- Object
- Workhorse::Worker
- Defined in:
- lib/workhorse/worker.rb
Overview
Main worker class that manages job polling and execution. Workers poll the database for jobs, manage thread pools for parallel execution, and handle graceful shutdown and memory monitoring.
Constant Summary collapse
- LOG_LEVELS =
%i[fatal error warn info debug].freeze
- SHUTDOWN_SIGNALS =
%w[TERM INT].freeze
- LOG_REOPEN_SIGNAL =
'HUP'.freeze
- SOFT_RESTART_SIGNAL =
'USR1'.freeze
Instance Attribute Summary collapse
-
#logger ⇒ Logger?
readonly
Optional logger instance.
-
#mutex ⇒ Mutex
readonly
Synchronization mutex for thread safety.
-
#poller ⇒ Workhorse::Poller
readonly
The poller instance.
-
#polling_interval ⇒ Integer
readonly
Polling interval in seconds.
-
#pool_size ⇒ Integer
readonly
Number of threads in the worker pool.
-
#queues ⇒ Array<Symbol>
readonly
The queues this worker processes.
-
#state ⇒ Symbol
readonly
Current worker state (:initialized, :running, :shutdown).
Class Method Summary collapse
-
.shutdown_file_for(pid) ⇒ String?
Returns the path to the shutdown file for a given process ID.
-
.start_and_wait(**args) ⇒ void
Instantiates and starts a new worker with the given arguments and then waits for its completion (i.e. an interrupt).
Instance Method Summary collapse
-
#accepting_jobs? ⇒ Boolean
Returns whether this worker is accepting new jobs.
-
#assert_state!(state) ⇒ void
Asserts that the worker is in the expected state.
-
#hostname ⇒ String
Returns the hostname of the machine running this worker.
-
#id ⇒ String
Returns the unique identifier for this worker.
-
#idle ⇒ Integer
Returns the number of idle threads in the pool.
-
#initialize(queues: [], pool_size: nil, polling_interval: 300, auto_terminate: true, quiet: true, instant_repolling: false, logger: nil) ⇒ Worker
constructor
Instantiates a new worker.
-
#log(text, level = :info) ⇒ void
Logs a message with worker ID prefix.
-
#perform(db_job_id) ⇒ void
Schedules a job for execution in the thread pool.
-
#pid ⇒ Integer
Returns the process ID of this worker.
-
#shutdown ⇒ void
Shuts down worker and DB poller.
-
#start ⇒ void
Starts the worker.
-
#wait ⇒ void
Waits until the worker is shut down.
Constructor Details
#initialize(queues: [], pool_size: nil, polling_interval: 300, auto_terminate: true, quiet: true, instant_repolling: false, logger: nil) ⇒ Worker
Instantiates a new worker. The worker is not automatically started.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/workhorse/worker.rb', line 89 def initialize(queues: [], pool_size: nil, polling_interval: 300, auto_terminate: true, quiet: true, instant_repolling: false, logger: nil) @queues = queues @pool_size = pool_size || (queues.size + 1) @polling_interval = polling_interval @auto_terminate = auto_terminate @state = :initialized @quiet = quiet @mutex = Mutex.new @pool = Pool.new(@pool_size) @poller = Workhorse::Poller.new(self, proc { check_memory }) @logger = logger @soft_restart_requested = Concurrent::AtomicBoolean.new(false) unless (@polling_interval / 0.1).round(2).modulo(1).zero? fail 'Polling interval must be a multiple of 0.1.' end if instant_repolling @pool.on_idle { @poller.instant_repoll! } end end |
Instance Attribute Details
#logger ⇒ Logger? (readonly)
Returns Optional logger instance.
42 43 44 |
# File 'lib/workhorse/worker.rb', line 42 def logger @logger end |
#mutex ⇒ Mutex (readonly)
Returns Synchronization mutex for thread safety.
39 40 41 |
# File 'lib/workhorse/worker.rb', line 39 def mutex @mutex end |
#poller ⇒ Workhorse::Poller (readonly)
Returns The poller instance.
45 46 47 |
# File 'lib/workhorse/worker.rb', line 45 def poller @poller end |
#polling_interval ⇒ Integer (readonly)
Returns Polling interval in seconds.
36 37 38 |
# File 'lib/workhorse/worker.rb', line 36 def polling_interval @polling_interval end |
#pool_size ⇒ Integer (readonly)
Returns Number of threads in the worker pool.
33 34 35 |
# File 'lib/workhorse/worker.rb', line 33 def pool_size @pool_size end |
#queues ⇒ Array<Symbol> (readonly)
Returns The queues this worker processes.
27 28 29 |
# File 'lib/workhorse/worker.rb', line 27 def queues @queues end |
#state ⇒ Symbol (readonly)
Returns Current worker state (:initialized, :running, :shutdown).
30 31 32 |
# File 'lib/workhorse/worker.rb', line 30 def state @state end |
Class Method Details
.shutdown_file_for(pid) ⇒ String?
Returns the path to the shutdown file for a given process ID.
63 64 65 66 |
# File 'lib/workhorse/worker.rb', line 63 def self.shutdown_file_for(pid) return nil unless defined?(Rails) Rails.root.join('tmp', 'pids', "workhorse.#{pid}.shutdown") end |
.start_and_wait(**args) ⇒ void
This method returns an undefined value.
Instantiates and starts a new worker with the given arguments and then waits for its completion (i.e. an interrupt).
52 53 54 55 56 |
# File 'lib/workhorse/worker.rb', line 52 def self.start_and_wait(**args) worker = new(**args) worker.start worker.wait end |
Instance Method Details
#accepting_jobs? ⇒ Boolean
Returns whether this worker is accepting new jobs. Returns false when a soft restart has been requested.
227 228 229 |
# File 'lib/workhorse/worker.rb', line 227 def accepting_jobs? @soft_restart_requested.false? end |
#assert_state!(state) ⇒ void
This method returns an undefined value.
Asserts that the worker is in the expected state.
174 175 176 |
# File 'lib/workhorse/worker.rb', line 174 def assert_state!(state) fail "Expected worker to be in state #{state} but current state is #{self.state}." unless self.state == state end |
#hostname ⇒ String
Returns the hostname of the machine running this worker.
144 145 146 |
# File 'lib/workhorse/worker.rb', line 144 def hostname @hostname ||= Socket.gethostname end |
#id ⇒ String
Returns the unique identifier for this worker. Format: hostname.pid.random_hex
130 131 132 |
# File 'lib/workhorse/worker.rb', line 130 def id @id ||= "#{hostname}.#{pid}.#{SecureRandom.hex(3)}" end |
#idle ⇒ Integer
Returns the number of idle threads in the pool.
219 220 221 |
# File 'lib/workhorse/worker.rb', line 219 def idle @pool.idle end |
#log(text, level = :info) ⇒ void
This method returns an undefined value.
Logs a message with worker ID prefix.
118 119 120 121 122 123 124 |
# File 'lib/workhorse/worker.rb', line 118 def log(text, level = :info) text = "[Job worker #{id}] #{text}" puts text unless @quiet return unless logger fail "Log level #{level} is not available. Available are #{LOG_LEVELS.inspect}." unless LOG_LEVELS.include?(level) logger.send(level, text.strip) end |
#perform(db_job_id) ⇒ void
This method returns an undefined value.
Schedules a job for execution in the thread pool.
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/workhorse/worker.rb', line 235 def perform(db_job_id) begin # rubocop:disable Style/RedundantBegin mutex.synchronize do assert_state! :running log "Posting job #{db_job_id} to thread pool" @pool.post do begin # rubocop:disable Style/RedundantBegin Workhorse::Performer.new(db_job_id, self).perform rescue Exception => e log %(#{e.}\n#{e.backtrace.join("\n")}), :error Workhorse.on_exception.call(e) end end end rescue Exception => e Workhorse.on_exception.call(e) end end |
#pid ⇒ Integer
Returns the process ID of this worker.
137 138 139 |
# File 'lib/workhorse/worker.rb', line 137 def pid @pid ||= Process.pid end |
#shutdown ⇒ void
This method returns an undefined value.
Shuts down worker and DB poller. Jobs currently being processed are properly finished before this method returns. Subsequent calls to this method are ignored.
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/workhorse/worker.rb', line 183 def shutdown # This is safe to be checked outside of the mutex as 'shutdown' is the # final state this worker can be in. return if @state == :shutdown # TODO: There is a race-condition with this shutdown: # - If the poller is currently locking a job, it may call # "worker.perform", which in turn tries to synchronize the same mutex. mutex.synchronize do assert_state! :running Workhorse.debug_log("[Job worker #{id}] Shutdown starting") log 'Shutting down' @state = :shutdown @poller.shutdown @pool.shutdown log 'Shut down' Workhorse.debug_log("[Job worker #{id}] Shutdown complete") end end |
#start ⇒ void
This method returns an undefined value.
Starts the worker. This call is not blocking - call #wait for this purpose.
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/workhorse/worker.rb', line 153 def start mutex.synchronize do assert_state! :initialized log 'Starting up' @state = :running @poller.start log 'Started up' Workhorse.debug_log("[Job worker #{id}] Started: PID=#{pid}, logger=#{describe_logger(logger)}") trap_termination if @auto_terminate trap_log_reopen trap_soft_restart end end |
#wait ⇒ void
This method returns an undefined value.
Waits until the worker is shut down. This only happens if #shutdown gets called - either by another thread or by enabling ‘auto_terminate` and receiving a respective signal. Use this method to let worker run indefinitely.
211 212 213 214 |
# File 'lib/workhorse/worker.rb', line 211 def wait @poller.wait @pool.wait end |