Class: Workhorse::Poller

Inherits:
Object
  • Object
show all
Defined in:
lib/workhorse/poller.rb

Overview

Database poller that discovers and locks jobs for execution. Handles job querying, global locking, and job distribution to workers. Supports both MySQL and Oracle databases with database-specific optimizations.

Examples:

Basic usage (typically used internally)

poller = Workhorse::Poller.new(worker, proc { true })
poller.start

Constant Summary collapse

MIN_LOCK_TIMEOUT =

In seconds

0.1
MAX_LOCK_TIMEOUT =

In seconds

1.0
ORACLE_LOCK_MODE =

X_MODE (exclusive)

6
ORACLE_LOCK_HANDLE =

Randomly chosen number

478_564_848

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker, before_poll = proc { true }) ⇒ Poller

Creates a new poller for the given worker.

Parameters:

  • worker (Workhorse::Worker)

    The worker to serve

  • before_poll (Proc) (defaults to: proc { true })

    Callback executed before each poll (should return boolean)



26
27
28
29
30
31
32
33
34
35
# File 'lib/workhorse/poller.rb', line 26

def initialize(worker, before_poll = proc { true })
  @worker = worker
  @running = false
  @table = Workhorse::DbJob.arel_table
  @is_oracle = ActiveRecord::Base.connection.adapter_name == 'OracleEnhanced'
  @instant_repoll = Concurrent::AtomicBoolean.new(false)
  @global_lock_fails = 0
  @max_global_lock_fails_reached = false
  @before_poll = before_poll
end

Instance Attribute Details

#tableArel::Table (readonly)

Returns The jobs table for query building.

Returns:

  • (Arel::Table)

    The jobs table for query building



20
21
22
# File 'lib/workhorse/poller.rb', line 20

def table
  @table
end

#workerWorkhorse::Worker (readonly)

Returns The worker this poller serves.

Returns:



17
18
19
# File 'lib/workhorse/poller.rb', line 17

def worker
  @worker
end

Instance Method Details

#instant_repoll!void

This method returns an undefined value.

Interrupts current sleep and performs the next poll immediately. After the poll, resumes normal polling interval.



108
109
110
111
# File 'lib/workhorse/poller.rb', line 108

def instant_repoll!
  worker.log 'Aborting next sleep to perform instant repoll', :debug
  @instant_repoll.make_true
end

#running?Boolean

Checks if the poller is currently running.

Returns:

  • (Boolean)

    True if poller is running



40
41
42
# File 'lib/workhorse/poller.rb', line 40

def running?
  @running
end

#shutdownvoid

This method returns an undefined value.

Shuts down the poller and waits for completion.

Raises:

  • (RuntimeError)

    If poller is not running



89
90
91
92
93
94
95
# File 'lib/workhorse/poller.rb', line 89

def shutdown
  fail 'Poller is not running.' unless running?
  Workhorse.debug_log("[Job worker #{worker.id}] Poller shutting down")
  @running = false
  wait
  Workhorse.debug_log("[Job worker #{worker.id}] Poller shut down")
end

#startvoid

This method returns an undefined value.

Starts the poller in a background thread.

Raises:

  • (RuntimeError)

    If poller is already running



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/workhorse/poller.rb', line 48

def start
  fail 'Poller is already running.' if running?
  @running = true

  Workhorse.debug_log("[Job worker #{worker.id}] Poller starting")

  clean_stuck_jobs! if Workhorse.clean_stuck_jobs

  @thread = Thread.new do
    Workhorse.debug_log("[Job worker #{worker.id}] Poller thread started")
    loop do
      break unless running?

      begin
        unless @before_poll.call
          Workhorse.debug_log("[Job worker #{worker.id}] before_poll returned false, triggering worker shutdown")
          Thread.new { worker.shutdown }
          sleep
          next
        end

        poll
        sleep
      rescue Exception => e
        Workhorse.debug_log("[Job worker #{worker.id}] Poller exception, shutting down: #{e.class}: #{e.message}")
        worker.log %(Poll encountered exception:\n#{e.message}\n#{e.backtrace.join("\n")})
        worker.log 'Worker shutting down...'
        Workhorse.on_exception.call(e) unless Workhorse.silence_poller_exceptions
        @running = false
        worker.instance_variable_get(:@pool).shutdown
        break
      end
    end
    Workhorse.debug_log("[Job worker #{worker.id}] Poller thread exiting")
  end
end

#waitvoid

This method returns an undefined value.

Waits for the poller thread to complete.



100
101
102
# File 'lib/workhorse/poller.rb', line 100

def wait
  @thread.join
end