Class: Postburner::Scheduler
- Inherits:
-
Object
- Object
- Postburner::Scheduler
- Defined in:
- lib/postburner/scheduler.rb
Overview
Lightweight scheduler that acts as a safety net for recurring job execution.
Unlike traditional schedulers that poll for due jobs, Postburner uses **immediate enqueue** where executions are created and immediately queued to Beanstalkd’s delayed queue. The scheduler acts as a **watchdog safety net** to ensure every schedule has a future execution queued.
## Architecture
This class is NOT persisted to the database. It’s instantiated on-the-fly by workers when they reserve a watchdog job from the scheduler tube. The watchdog is ephemeral data in Beanstalkd with this payload:
{ "scheduler": true, "interval": 300 }
No dedicated scheduler process is needed - existing workers handle everything.
## Watchdog Pattern
-
Workers automatically watch the ‘scheduler’ tube
-
On reserve timeout, workers check if watchdog exists and create if missing
-
When worker reserves watchdog, it instantiates Postburner::Scheduler
-
Scheduler executes with PostgreSQL advisory lock for coordination
-
After completion, watchdog re-queues itself with delay for next interval
## Safety Net Functions
The watchdog performs three safety net functions:
-
Auto-bootstrap: Creates first execution for schedules that haven’t been started
-
**Future execution guarantee**: Ensures each schedule has a future execution queued
-
**Orphan cleanup**: Enqueues any pending executions that weren’t properly queued
Note: For Postburner::Job schedules, a before_attempt callback creates the next execution when the current job runs, providing immediate pickup without waiting for the watchdog. The watchdog is just the safety net.
## Configuration
Add to config/postburner.yml:
production:
scheduler_interval: 300 # Check every 5 minutes (default)
scheduler_priority: 100 # Watchdog priority (default)
The interval primarily affects:
-
How quickly new schedules are auto-bootstrapped (if you don’t call start!)
-
Recovery time if an execution somehow fails to enqueue
-
How often last_audit_at is updated for monitoring
Since executions are enqueued immediately to Beanstalkd’s delayed queue, the watchdog interval doesn’t affect execution timing - only the safety net checks.
## Execution Flow
-
Worker reserves watchdog from scheduler tube
-
Instantiates Scheduler with interval from payload
-
Scheduler#perform acquires PostgreSQL advisory lock
-
Processes all enabled schedules:
-
Auto-bootstraps unstarted schedules (creates + enqueues first execution)
-
Ensures future execution exists (creates + enqueues if missing)
-
Enqueues any orphaned pending executions
-
Updates last_audit_at timestamp
-
-
Re-queues watchdog with delay for next interval
## Observability
Monitor scheduler health via last_audit_at:
stale = Postburner::Schedule.enabled
.where('last_audit_at < ?', 15.minutes.ago)
.or(Postburner::Schedule.where(last_audit_at: nil))
Constant Summary collapse
- SCHEDULER_TUBE_NAME =
'scheduler'- DEFAULT_INTERVAL =
5 minutes
300- DEFAULT_PRIORITY =
Lower number = higher priority
100- WATCHDOG_MUTEX =
Class method to enqueue watchdog to Beanstalkd Mutex for coordinating watchdog checks across threads
Mutex.new
Class Attribute Summary collapse
-
.watchdog_check_failed_at ⇒ Object
Returns the value of attribute watchdog_check_failed_at.
-
.watchdog_last_checked_at ⇒ Object
Returns the value of attribute watchdog_last_checked_at.
Instance Attribute Summary collapse
-
#interval ⇒ Object
readonly
Returns the value of attribute interval.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Class Method Summary collapse
-
.enqueue_watchdog(interval: nil, priority: nil) ⇒ Hash
Enqueue a new scheduler watchdog job.
-
.ensure_watchdog!(connection:) ⇒ void
Ensure a scheduler watchdog exists in the queue.
-
.watchdog_exists?(connection:) ⇒ Boolean
Check if scheduler watchdog exists in queue.
Instance Method Summary collapse
-
#initialize(interval: DEFAULT_INTERVAL, logger: nil) ⇒ Scheduler
constructor
Initialize a new scheduler instance.
-
#perform ⇒ void
Execute the scheduler.
Constructor Details
#initialize(interval: DEFAULT_INTERVAL, logger: nil) ⇒ Scheduler
Initialize a new scheduler instance.
The scheduler is instantiated by workers when they reserve a watchdog job from the scheduler tube. It’s not persisted - each run creates a new instance.
110 111 112 113 |
# File 'lib/postburner/scheduler.rb', line 110 def initialize(interval: DEFAULT_INTERVAL, logger: nil) @interval = interval @logger = logger || (defined?(Rails) ? Rails.logger : Logger.new($stdout)) end |
Class Attribute Details
.watchdog_check_failed_at ⇒ Object
Returns the value of attribute watchdog_check_failed_at.
213 214 215 |
# File 'lib/postburner/scheduler.rb', line 213 def watchdog_check_failed_at @watchdog_check_failed_at end |
.watchdog_last_checked_at ⇒ Object
Returns the value of attribute watchdog_last_checked_at.
213 214 215 |
# File 'lib/postburner/scheduler.rb', line 213 def watchdog_last_checked_at @watchdog_last_checked_at end |
Instance Attribute Details
#interval ⇒ Object (readonly)
Returns the value of attribute interval.
96 97 98 |
# File 'lib/postburner/scheduler.rb', line 96 def interval @interval end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
96 97 98 |
# File 'lib/postburner/scheduler.rb', line 96 def logger @logger end |
Class Method Details
.enqueue_watchdog(interval: nil, priority: nil) ⇒ Hash
Enqueue a new scheduler watchdog job.
Creates a watchdog job in the scheduler tube with a delay equal to the interval. The watchdog will execute after the delay, process all schedules, and re-queue itself.
Reads interval and priority from configuration if not provided:
-
scheduler_interval (default: 300 seconds)
-
scheduler_priority (default: 100)
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 |
# File 'lib/postburner/scheduler.rb', line 283 def self.enqueue_watchdog(interval: nil, priority: nil) interval ||= Postburner.configuration.default_scheduler_interval || DEFAULT_INTERVAL priority ||= Postburner.configuration.default_scheduler_priority || DEFAULT_PRIORITY payload = { 'scheduler' => true, 'interval' => interval } tube_name = Postburner.scheduler_tube_name response = nil Postburner.connected do |conn| response = conn.tubes[tube_name].put( JSON.generate(payload), pri: priority, delay: interval, ttr: 120 # 2 minutes to complete ) end if response[:status] == "INSERTED" runs_at = Time.current + interval Rails.logger.info "[Postburner::Scheduler] Inserted watchdog (bkid: #{response[:id]}, interval: #{interval}s, runs_at: #{runs_at.iso8601}, tube: #{tube_name})" else Rails.logger.error "[Postburner::Scheduler] Failed to insert watchdog: #{response.inspect} (interval: #{interval}s, tube: #{tube_name})" end response end |
.ensure_watchdog!(connection:) ⇒ void
This method returns an undefined value.
Ensure a scheduler watchdog exists in the queue.
Uses process-level mutex coordination so only one thread checks at a time. Called by workers on reserve timeout to automatically recreate watchdog if it’s missing (e.g., after Beanstalkd restart or watchdog expiration).
Implements throttling:
-
Skips check if successful check within last 60 seconds
-
Skips check if failed check within last 60 seconds
-
Only one thread can check at a time (mutex)
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/postburner/scheduler.rb', line 235 def self.ensure_watchdog!(connection:) # Quick check without lock - skip if recently checked return if watchdog_last_checked_at && Time.current - watchdog_last_checked_at < 60 return if watchdog_check_failed_at && Time.current - watchdog_check_failed_at < 60 # Try to acquire lock, skip if another thread is checking return unless WATCHDOG_MUTEX.try_lock begin # Double-check after acquiring lock return if watchdog_last_checked_at && Time.current - watchdog_last_checked_at < 60 if watchdog_exists?(connection: connection) self.watchdog_last_checked_at = Time.current return end enqueue_watchdog self.watchdog_last_checked_at = Time.current self.watchdog_check_failed_at = nil rescue => e self.watchdog_check_failed_at = Time.current Rails.logger.error "[Postburner::Scheduler] Watchdog check failed, will retry in 60s: #{e.}" ensure WATCHDOG_MUTEX.unlock end end |
.watchdog_exists?(connection:) ⇒ Boolean
Check if scheduler watchdog exists in queue.
Peeks both the delayed and ready queues in the scheduler tube. Returns true if watchdog exists in either state.
328 329 330 331 332 333 334 335 336 337 338 339 |
# File 'lib/postburner/scheduler.rb', line 328 def self.watchdog_exists?(connection:) tube_name = Postburner.scheduler_tube_name tube = connection.beanstalk.tubes[tube_name] # Peek is lighter than stats delayed = tube.peek(:delayed) rescue nil ready = tube.peek(:ready) rescue nil delayed.present? || ready.present? rescue Beaneater::NotFoundError false end |
Instance Method Details
#perform ⇒ void
This method returns an undefined value.
Execute the scheduler.
Processes all enabled schedules with PostgreSQL advisory lock coordination to prevent concurrent execution across multiple workers. After processing, automatically re-queues the watchdog for the next run.
The scheduler performs two main functions:
-
Auto-bootstrap new schedules (create first execution if not started)
-
Ensure each schedule has a future execution queued
-
Enqueue any orphaned pending executions that weren’t properly queued
Instruments with ActiveSupport::Notifications:
-
perform_start.scheduler.postburner: When scheduler run begins
-
perform.scheduler.postburner: Around scheduler run (summary stats)
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/postburner/scheduler.rb', line 138 def perform # Self-deduplicate: if another watchdog exists in the queue, exit early # and let that one handle scheduling. This naturally resolves duplicate # watchdogs that can occur from race conditions. @skip_requeue = false if another_watchdog_queued? logger.info "[Postburner::Scheduler] Another watchdog already queued, exiting to deduplicate" @skip_requeue = true return end logger.info "[Postburner::Scheduler] Starting scheduler run" ActiveSupport::Notifications.instrument('perform_start.scheduler.postburner', { interval: interval }) # Track stats for summary event @schedules_processed = 0 @schedules_failed = 0 @executions_created = 0 @orphans_enqueued = 0 lock_acquired = false # Build payload hash that will be mutated with final stats payload = { interval: interval, lock_acquired: nil, schedules_processed: nil, schedules_failed: nil, executions_created: nil, orphans_enqueued: nil } # Use advisory lock to coordinate multiple workers ActiveSupport::Notifications.instrument('perform.scheduler.postburner', payload) do begin lock_acquired = Postburner::AdvisoryLock.with_lock(AdvisoryLock::SCHEDULER_LOCK_KEY, blocking: false) do process_all_schedules true end if lock_acquired logger.info "[Postburner::Scheduler] Scheduler run complete" else logger.info "[Postburner::Scheduler] Could not acquire lock, skipping" end rescue ActiveRecord::ConnectionTimeoutError => e # This can happen if the connection pool is exhausted # Log cleanly and let the watchdog retry on next interval logger.warn "[Postburner::Scheduler] Database connection pool exhausted. Skipping scheduler run (need advisory lock), will retry on next interval" logger.debug "[Postburner::Scheduler] Check database.yml pool and max_connections i.e. pool >= needed connection count from web/job workers" logger.debug "[Postburner::Scheduler] ActiveRecord Connection timeout: #{e.}" lock_acquired = false end # Update payload with final stats (mutates the hash subscribers receive) payload[:lock_acquired] = lock_acquired payload[:schedules_processed] = @schedules_processed payload[:schedules_failed] = @schedules_failed payload[:executions_created] = @executions_created payload[:orphans_enqueued] = @orphans_enqueued end ensure # Re-queue watchdog for next run (unless we're deduplicating) requeue_watchdog unless @skip_requeue end |