Class: Workhorse::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/workhorse/worker.rb

Overview

Main worker class that manages job polling and execution. Workers poll the database for jobs, manage thread pools for parallel execution, and handle graceful shutdown and memory monitoring.

Examples:

Basic worker setup

worker = Workhorse::Worker.new(
  queues: [:default, :urgent],
  pool_size: 4,
  polling_interval: 30
)
worker.start
worker.wait

Auto-terminating worker

Workhorse::Worker.start_and_wait(
  queues: [:email, :reports],
  auto_terminate: true
)

Constant Summary collapse

LOG_LEVELS =
%i[fatal error warn info debug].freeze
SHUTDOWN_SIGNALS =
%w[TERM INT].freeze
LOG_REOPEN_SIGNAL =
'HUP'.freeze
SOFT_RESTART_SIGNAL =
'USR1'.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queues: [], pool_size: nil, polling_interval: 300, auto_terminate: true, quiet: true, instant_repolling: false, logger: nil) ⇒ Worker

Instantiates a new worker. The worker is not automatically started.

Parameters:

  • queues (Array) (defaults to: [])

    The queues you want this worker to process. If an empty array is given, any queues will be processed. Queues need to be specified as a symbol. To also process jobs without a queue, supply ‘nil` within the array.

  • pool_size (Integer) (defaults to: nil)

    The number of jobs that will be processed simultaneously. If this parameter is not given, it will be set to the number of given queues + 1.

  • polling_interval (Integer) (defaults to: 300)

    Interval in seconds the database will be polled for new jobs. Set this as high as possible to avoid unnecessary database load. Defaults to 5 minutes.

  • auto_terminate (Boolean) (defaults to: true)

    Whether to automatically shut down the worker properly on INT and TERM signals.

  • quiet (Boolean) (defaults to: true)

    If this is set to ‘false`, the worker will also log to STDOUT.

  • instant_repolling (Boolean) (defaults to: false)

    If this is set to ‘true`, the worker immediately re-polls for new jobs when a job execution has finished.

  • logger (Logger) (defaults to: nil)

    An optional logger the worker will append to. This can be any instance of ruby’s ‘Logger` but is commonly set to `Rails.logger`.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/workhorse/worker.rb', line 89

def initialize(queues: [], pool_size: nil, polling_interval: 300, auto_terminate: true, quiet: true, instant_repolling: false, logger: nil)
  @queues = queues
  @pool_size = pool_size || (queues.size + 1)
  @polling_interval = polling_interval
  @auto_terminate = auto_terminate
  @state = :initialized
  @quiet = quiet

  @mutex = Mutex.new
  @pool = Pool.new(@pool_size)
  @poller = Workhorse::Poller.new(self, proc { check_memory })
  @logger = logger
  @soft_restart_requested = Concurrent::AtomicBoolean.new(false)

  unless (@polling_interval / 0.1).round(2).modulo(1).zero?
    fail 'Polling interval must be a multiple of 0.1.'
  end

  if instant_repolling
    @pool.on_idle { @poller.instant_repoll! }
  end
end

Instance Attribute Details

#loggerLogger? (readonly)

Returns Optional logger instance.

Returns:

  • (Logger, nil)

    Optional logger instance



42
43
44
# File 'lib/workhorse/worker.rb', line 42

def logger
  @logger
end

#mutexMutex (readonly)

Returns Synchronization mutex for thread safety.

Returns:

  • (Mutex)

    Synchronization mutex for thread safety



39
40
41
# File 'lib/workhorse/worker.rb', line 39

def mutex
  @mutex
end

#pollerWorkhorse::Poller (readonly)

Returns The poller instance.

Returns:



45
46
47
# File 'lib/workhorse/worker.rb', line 45

def poller
  @poller
end

#polling_intervalInteger (readonly)

Returns Polling interval in seconds.

Returns:

  • (Integer)

    Polling interval in seconds



36
37
38
# File 'lib/workhorse/worker.rb', line 36

def polling_interval
  @polling_interval
end

#pool_sizeInteger (readonly)

Returns Number of threads in the worker pool.

Returns:

  • (Integer)

    Number of threads in the worker pool



33
34
35
# File 'lib/workhorse/worker.rb', line 33

def pool_size
  @pool_size
end

#queuesArray<Symbol> (readonly)

Returns The queues this worker processes.

Returns:

  • (Array<Symbol>)

    The queues this worker processes



27
28
29
# File 'lib/workhorse/worker.rb', line 27

def queues
  @queues
end

#stateSymbol (readonly)

Returns Current worker state (:initialized, :running, :shutdown).

Returns:

  • (Symbol)

    Current worker state (:initialized, :running, :shutdown)



30
31
32
# File 'lib/workhorse/worker.rb', line 30

def state
  @state
end

Class Method Details

.shutdown_file_for(pid) ⇒ String?

Returns the path to the shutdown file for a given process ID.

Parameters:

  • pid (Integer)

    Process ID

Returns:

  • (String, nil)

    Path to shutdown file or nil if not in Rails



63
64
65
66
# File 'lib/workhorse/worker.rb', line 63

def self.shutdown_file_for(pid)
  return nil unless defined?(Rails)
  Rails.root.join('tmp', 'pids', "workhorse.#{pid}.shutdown")
end

.start_and_wait(**args) ⇒ void

This method returns an undefined value.

Instantiates and starts a new worker with the given arguments and then waits for its completion (i.e. an interrupt).

Parameters:



52
53
54
55
56
# File 'lib/workhorse/worker.rb', line 52

def self.start_and_wait(**args)
  worker = new(**args)
  worker.start
  worker.wait
end

Instance Method Details

#accepting_jobs?Boolean

Returns whether this worker is accepting new jobs. Returns false when a soft restart has been requested.

Returns:

  • (Boolean)

    True if accepting jobs, false otherwise



227
228
229
# File 'lib/workhorse/worker.rb', line 227

def accepting_jobs?
  @soft_restart_requested.false?
end

#assert_state!(state) ⇒ void

This method returns an undefined value.

Asserts that the worker is in the expected state.

Parameters:

  • state (Symbol)

    Expected state

Raises:

  • (RuntimeError)

    If worker is not in expected state



174
175
176
# File 'lib/workhorse/worker.rb', line 174

def assert_state!(state)
  fail "Expected worker to be in state #{state} but current state is #{self.state}." unless self.state == state
end

#hostnameString

Returns the hostname of the machine running this worker.

Returns:

  • (String)

    Hostname



144
145
146
# File 'lib/workhorse/worker.rb', line 144

def hostname
  @hostname ||= Socket.gethostname
end

#idString

Returns the unique identifier for this worker. Format: hostname.pid.random_hex

Returns:

  • (String)

    Unique worker identifier



130
131
132
# File 'lib/workhorse/worker.rb', line 130

def id
  @id ||= "#{hostname}.#{pid}.#{SecureRandom.hex(3)}"
end

#idleInteger

Returns the number of idle threads in the pool.

Returns:

  • (Integer)

    Number of idle threads



219
220
221
# File 'lib/workhorse/worker.rb', line 219

def idle
  @pool.idle
end

#log(text, level = :info) ⇒ void

This method returns an undefined value.

Logs a message with worker ID prefix.

Parameters:

  • text (String)

    The message to log

  • level (Symbol) (defaults to: :info)

    The log level (must be in LOG_LEVELS)

Raises:

  • (RuntimeError)

    If log level is invalid



118
119
120
121
122
123
124
# File 'lib/workhorse/worker.rb', line 118

def log(text, level = :info)
  text = "[Job worker #{id}] #{text}"
  puts text unless @quiet
  return unless logger
  fail "Log level #{level} is not available. Available are #{LOG_LEVELS.inspect}." unless LOG_LEVELS.include?(level)
  logger.send(level, text.strip)
end

#perform(db_job_id) ⇒ void

This method returns an undefined value.

Schedules a job for execution in the thread pool.

Parameters:

  • db_job_id (Integer)

    The ID of the DbJob to perform



235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/workhorse/worker.rb', line 235

def perform(db_job_id)
  begin # rubocop:disable Style/RedundantBegin
    mutex.synchronize do
      assert_state! :running
      log "Posting job #{db_job_id} to thread pool"

      @pool.post do
        begin # rubocop:disable Style/RedundantBegin
          Workhorse::Performer.new(db_job_id, self).perform
        rescue Exception => e
          log %(#{e.message}\n#{e.backtrace.join("\n")}), :error
          Workhorse.on_exception.call(e)
        end
      end
    end
  rescue Exception => e
    Workhorse.on_exception.call(e)
  end
end

#pidInteger

Returns the process ID of this worker.

Returns:

  • (Integer)

    Process ID



137
138
139
# File 'lib/workhorse/worker.rb', line 137

def pid
  @pid ||= Process.pid
end

#shutdownvoid

This method returns an undefined value.

Shuts down worker and DB poller. Jobs currently being processed are properly finished before this method returns. Subsequent calls to this method are ignored.



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/workhorse/worker.rb', line 183

def shutdown
  # This is safe to be checked outside of the mutex as 'shutdown' is the
  # final state this worker can be in.
  return if @state == :shutdown

  # TODO: There is a race-condition with this shutdown:
  #  - If the poller is currently locking a job, it may call
  #    "worker.perform", which in turn tries to synchronize the same mutex.
  mutex.synchronize do
    assert_state! :running

    Workhorse.debug_log("[Job worker #{id}] Shutdown starting")
    log 'Shutting down'
    @state = :shutdown

    @poller.shutdown
    @pool.shutdown
    log 'Shut down'
    Workhorse.debug_log("[Job worker #{id}] Shutdown complete")
  end
end

#startvoid

This method returns an undefined value.

Starts the worker. This call is not blocking - call #wait for this purpose.

Raises:

  • (RuntimeError)

    If worker is not in initialized state



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/workhorse/worker.rb', line 153

def start
  mutex.synchronize do
    assert_state! :initialized
    log 'Starting up'
    @state = :running
    @poller.start
    log 'Started up'

    Workhorse.debug_log("[Job worker #{id}] Started: PID=#{pid}, logger=#{describe_logger(logger)}")

    trap_termination if @auto_terminate
    trap_log_reopen
    trap_soft_restart
  end
end

#waitvoid

This method returns an undefined value.

Waits until the worker is shut down. This only happens if #shutdown gets called - either by another thread or by enabling ‘auto_terminate` and receiving a respective signal. Use this method to let worker run indefinitely.



211
212
213
214
# File 'lib/workhorse/worker.rb', line 211

def wait
  @poller.wait
  @pool.wait
end