Class: Workhorse::Poller
- Inherits:
-
Object
- Object
- Workhorse::Poller
- Defined in:
- lib/workhorse/poller.rb
Overview
Database poller that discovers and locks jobs for execution. Handles job querying, global locking, and job distribution to workers. Supports both MySQL and Oracle databases with database-specific optimizations.
Constant Summary collapse
- MIN_LOCK_TIMEOUT =
In seconds
0.1- MAX_LOCK_TIMEOUT =
In seconds
1.0- ORACLE_LOCK_MODE =
X_MODE (exclusive)
6- ORACLE_LOCK_HANDLE =
Randomly chosen number
478_564_848
Instance Attribute Summary collapse
-
#table ⇒ Arel::Table
readonly
The jobs table for query building.
-
#worker ⇒ Workhorse::Worker
readonly
The worker this poller serves.
Instance Method Summary collapse
-
#initialize(worker, before_poll = proc { true }) ⇒ Poller
constructor
Creates a new poller for the given worker.
-
#instant_repoll! ⇒ void
Interrupts current sleep and performs the next poll immediately.
-
#running? ⇒ Boolean
Checks if the poller is currently running.
-
#shutdown ⇒ void
Shuts down the poller and waits for completion.
-
#start ⇒ void
Starts the poller in a background thread.
-
#wait ⇒ void
Waits for the poller thread to complete.
Constructor Details
#initialize(worker, before_poll = proc { true }) ⇒ Poller
Creates a new poller for the given worker.
26 27 28 29 30 31 32 33 34 35 |
# File 'lib/workhorse/poller.rb', line 26 def initialize(worker, before_poll = proc { true }) @worker = worker @running = false @table = Workhorse::DbJob.arel_table @is_oracle = ActiveRecord::Base.connection.adapter_name == 'OracleEnhanced' @instant_repoll = Concurrent::AtomicBoolean.new(false) @global_lock_fails = 0 @max_global_lock_fails_reached = false @before_poll = before_poll end |
Instance Attribute Details
#table ⇒ Arel::Table (readonly)
Returns The jobs table for query building.
20 21 22 |
# File 'lib/workhorse/poller.rb', line 20 def table @table end |
#worker ⇒ Workhorse::Worker (readonly)
Returns The worker this poller serves.
17 18 19 |
# File 'lib/workhorse/poller.rb', line 17 def worker @worker end |
Instance Method Details
#instant_repoll! ⇒ void
This method returns an undefined value.
Interrupts current sleep and performs the next poll immediately. After the poll, resumes normal polling interval.
108 109 110 111 |
# File 'lib/workhorse/poller.rb', line 108 def instant_repoll! worker.log 'Aborting next sleep to perform instant repoll', :debug @instant_repoll.make_true end |
#running? ⇒ Boolean
Checks if the poller is currently running.
40 41 42 |
# File 'lib/workhorse/poller.rb', line 40 def running? @running end |
#shutdown ⇒ void
This method returns an undefined value.
Shuts down the poller and waits for completion.
89 90 91 92 93 94 95 |
# File 'lib/workhorse/poller.rb', line 89 def shutdown fail 'Poller is not running.' unless running? Workhorse.debug_log("[Job worker #{worker.id}] Poller shutting down") @running = false wait Workhorse.debug_log("[Job worker #{worker.id}] Poller shut down") end |
#start ⇒ void
This method returns an undefined value.
Starts the poller in a background thread.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/workhorse/poller.rb', line 48 def start fail 'Poller is already running.' if running? @running = true Workhorse.debug_log("[Job worker #{worker.id}] Poller starting") clean_stuck_jobs! if Workhorse.clean_stuck_jobs @thread = Thread.new do Workhorse.debug_log("[Job worker #{worker.id}] Poller thread started") loop do break unless running? begin unless @before_poll.call Workhorse.debug_log("[Job worker #{worker.id}] before_poll returned false, triggering worker shutdown") Thread.new { worker.shutdown } sleep next end poll sleep rescue Exception => e Workhorse.debug_log("[Job worker #{worker.id}] Poller exception, shutting down: #{e.class}: #{e.}") worker.log %(Poll encountered exception:\n#{e.}\n#{e.backtrace.join("\n")}) worker.log 'Worker shutting down...' Workhorse.on_exception.call(e) unless Workhorse.silence_poller_exceptions @running = false worker.instance_variable_get(:@pool).shutdown break end end Workhorse.debug_log("[Job worker #{worker.id}] Poller thread exiting") end end |
#wait ⇒ void
This method returns an undefined value.
Waits for the poller thread to complete.
100 101 102 |
# File 'lib/workhorse/poller.rb', line 100 def wait @thread.join end |