Class: Postburner::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/postburner/scheduler.rb

Overview

Lightweight scheduler that acts as a safety net for recurring job execution.

Unlike traditional schedulers that poll for due jobs, Postburner uses **immediate enqueue** where executions are created and immediately queued to Beanstalkd’s delayed queue. The scheduler acts as a **watchdog safety net** to ensure every schedule has a future execution queued.

## Architecture

This class is NOT persisted to the database. It’s instantiated on-the-fly by workers when they reserve a watchdog job from the scheduler tube. The watchdog is ephemeral data in Beanstalkd with this payload:

{ "scheduler": true, "interval": 300 }

No dedicated scheduler process is needed - existing workers handle everything.

## Watchdog Pattern

  1. Workers automatically watch the ‘scheduler’ tube

  2. On reserve timeout, workers check if watchdog exists and create if missing

  3. When worker reserves watchdog, it instantiates Postburner::Scheduler

  4. Scheduler executes with PostgreSQL advisory lock for coordination

  5. After completion, watchdog re-queues itself with delay for next interval

## Safety Net Functions

The watchdog performs three safety net functions:

  1. Auto-bootstrap: Creates first execution for schedules that haven’t been started

  2. **Future execution guarantee**: Ensures each schedule has a future execution queued

  3. **Orphan cleanup**: Enqueues any pending executions that weren’t properly queued

Note: For Postburner::Job schedules, a before_attempt callback creates the next execution when the current job runs, providing immediate pickup without waiting for the watchdog. The watchdog is just the safety net.

## Configuration

Add to config/postburner.yml:

production:
  scheduler_interval: 300  # Check every 5 minutes (default)
  scheduler_priority: 100  # Watchdog priority (default)

The interval primarily affects:

  • How quickly new schedules are auto-bootstrapped (if you don’t call start!)

  • Recovery time if an execution somehow fails to enqueue

  • How often last_audit_at is updated for monitoring

Since executions are enqueued immediately to Beanstalkd’s delayed queue, the watchdog interval doesn’t affect execution timing - only the safety net checks.

## Execution Flow

  1. Worker reserves watchdog from scheduler tube

  2. Instantiates Scheduler with interval from payload

  3. Scheduler#perform acquires PostgreSQL advisory lock

  4. Processes all enabled schedules:

    • Auto-bootstraps unstarted schedules (creates + enqueues first execution)

    • Ensures future execution exists (creates + enqueues if missing)

    • Enqueues any orphaned pending executions

    • Updates last_audit_at timestamp

  5. Re-queues watchdog with delay for next interval

## Observability

Monitor scheduler health via last_audit_at:

stale = Postburner::Schedule.enabled
  .where('last_audit_at < ?', 15.minutes.ago)
  .or(Postburner::Schedule.where(last_audit_at: nil))

Examples:

Watchdog payload in Beanstalkd

{
  "scheduler": true,
  "interval": 300
}

Manual watchdog creation (usually automatic)

Postburner::Scheduler.enqueue_watchdog(interval: 300, priority: 100)

See Also:

Constant Summary collapse

SCHEDULER_TUBE_NAME =
'scheduler'
DEFAULT_INTERVAL =

5 minutes

300
DEFAULT_PRIORITY =

Lower number = higher priority

100
WATCHDOG_MUTEX =

Class method to enqueue watchdog to Beanstalkd Mutex for coordinating watchdog checks across threads

Mutex.new

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(interval: DEFAULT_INTERVAL, logger: nil) ⇒ Scheduler

Initialize a new scheduler instance.

The scheduler is instantiated by workers when they reserve a watchdog job from the scheduler tube. It’s not persisted - each run creates a new instance.

Examples:

scheduler = Postburner::Scheduler.new(interval: 300)
scheduler.perform

Parameters:

  • interval (Integer) (defaults to: DEFAULT_INTERVAL)

    Seconds until next run (default: 300)

  • logger (Logger, nil) (defaults to: nil)

    Logger instance (default: Rails.logger or stdout)



110
111
112
113
# File 'lib/postburner/scheduler.rb', line 110

def initialize(interval: DEFAULT_INTERVAL, logger: nil)
  @interval = interval
  @logger = logger || (defined?(Rails) ? Rails.logger : Logger.new($stdout))
end

Class Attribute Details

.watchdog_check_failed_atObject

Returns the value of attribute watchdog_check_failed_at.



213
214
215
# File 'lib/postburner/scheduler.rb', line 213

def watchdog_check_failed_at
  @watchdog_check_failed_at
end

.watchdog_last_checked_atObject

Returns the value of attribute watchdog_last_checked_at.



213
214
215
# File 'lib/postburner/scheduler.rb', line 213

def watchdog_last_checked_at
  @watchdog_last_checked_at
end

Instance Attribute Details

#intervalObject (readonly)

Returns the value of attribute interval.



96
97
98
# File 'lib/postburner/scheduler.rb', line 96

def interval
  @interval
end

#loggerObject (readonly)

Returns the value of attribute logger.



96
97
98
# File 'lib/postburner/scheduler.rb', line 96

def logger
  @logger
end

Class Method Details

.enqueue_watchdog(interval: nil, priority: nil) ⇒ Hash

Enqueue a new scheduler watchdog job.

Creates a watchdog job in the scheduler tube with a delay equal to the interval. The watchdog will execute after the delay, process all schedules, and re-queue itself.

Reads interval and priority from configuration if not provided:

  • scheduler_interval (default: 300 seconds)

  • scheduler_priority (default: 100)

Examples:

Enqueue with defaults

Postburner::Scheduler.enqueue_watchdog
# => { status: "INSERTED", id: 12345 }

Enqueue with custom interval

Postburner::Scheduler.enqueue_watchdog(interval: 60, priority: 0)

Parameters:

  • interval (Integer, nil) (defaults to: nil)

    Seconds until next run (default: from config)

  • priority (Integer, nil) (defaults to: nil)

    Beanstalkd priority (default: from config)

Returns:

  • (Hash)

    Beanstalkd response with :status and :id keys



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/postburner/scheduler.rb', line 283

def self.enqueue_watchdog(interval: nil, priority: nil)
  interval ||= Postburner.configuration.default_scheduler_interval || DEFAULT_INTERVAL
  priority ||= Postburner.configuration.default_scheduler_priority || DEFAULT_PRIORITY

  payload = {
    'scheduler' => true,
    'interval' => interval
  }

  tube_name = Postburner.scheduler_tube_name
  response = nil

  Postburner.connected do |conn|
    response = conn.tubes[tube_name].put(
      JSON.generate(payload),
      pri: priority,
      delay: interval,
      ttr: 120 # 2 minutes to complete
    )
  end

  if response[:status] == "INSERTED"
    runs_at = Time.current + interval
    Rails.logger.info "[Postburner::Scheduler] Inserted watchdog (bkid: #{response[:id]}, interval: #{interval}s, runs_at: #{runs_at.iso8601}, tube: #{tube_name})"
  else
    Rails.logger.error "[Postburner::Scheduler] Failed to insert watchdog: #{response.inspect} (interval: #{interval}s, tube: #{tube_name})"
  end

  response
end

.ensure_watchdog!(connection:) ⇒ void

This method returns an undefined value.

Ensure a scheduler watchdog exists in the queue.

Uses process-level mutex coordination so only one thread checks at a time. Called by workers on reserve timeout to automatically recreate watchdog if it’s missing (e.g., after Beanstalkd restart or watchdog expiration).

Implements throttling:

  • Skips check if successful check within last 60 seconds

  • Skips check if failed check within last 60 seconds

  • Only one thread can check at a time (mutex)

Examples:

Called by worker on timeout

Postburner.connected do |conn|
  Postburner::Scheduler.ensure_watchdog!(connection: conn)
end

Parameters:



235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/postburner/scheduler.rb', line 235

def self.ensure_watchdog!(connection:)
  # Quick check without lock - skip if recently checked
  return if watchdog_last_checked_at && Time.current - watchdog_last_checked_at < 60
  return if watchdog_check_failed_at && Time.current - watchdog_check_failed_at < 60

  # Try to acquire lock, skip if another thread is checking
  return unless WATCHDOG_MUTEX.try_lock

  begin
    # Double-check after acquiring lock
    return if watchdog_last_checked_at && Time.current - watchdog_last_checked_at < 60

    if watchdog_exists?(connection: connection)
      self.watchdog_last_checked_at = Time.current
      return
    end

    enqueue_watchdog
    self.watchdog_last_checked_at = Time.current
    self.watchdog_check_failed_at = nil
  rescue => e
    self.watchdog_check_failed_at = Time.current
    Rails.logger.error "[Postburner::Scheduler] Watchdog check failed, will retry in 60s: #{e.message}"
  ensure
    WATCHDOG_MUTEX.unlock
  end
end

.watchdog_exists?(connection:) ⇒ Boolean

Check if scheduler watchdog exists in queue.

Peeks both the delayed and ready queues in the scheduler tube. Returns true if watchdog exists in either state.

Examples:

Check for watchdog

Postburner.connected do |conn|
  exists = Postburner::Scheduler.watchdog_exists?(connection: conn)
  puts "Watchdog present" if exists
end

Parameters:

Returns:

  • (Boolean)

    true if watchdog exists (delayed or ready), false otherwise



328
329
330
331
332
333
334
335
336
337
338
339
# File 'lib/postburner/scheduler.rb', line 328

def self.watchdog_exists?(connection:)
  tube_name = Postburner.scheduler_tube_name
  tube = connection.beanstalk.tubes[tube_name]

  # Peek is lighter than stats
  delayed = tube.peek(:delayed) rescue nil
  ready = tube.peek(:ready) rescue nil

  delayed.present? || ready.present?
rescue Beaneater::NotFoundError
  false
end

Instance Method Details

#performvoid

This method returns an undefined value.

Execute the scheduler.

Processes all enabled schedules with PostgreSQL advisory lock coordination to prevent concurrent execution across multiple workers. After processing, automatically re-queues the watchdog for the next run.

The scheduler performs two main functions:

  1. Auto-bootstrap new schedules (create first execution if not started)

  2. Ensure each schedule has a future execution queued

  3. Enqueue any orphaned pending executions that weren’t properly queued

Instruments with ActiveSupport::Notifications:

  • perform_start.scheduler.postburner: When scheduler run begins

  • perform.scheduler.postburner: Around scheduler run (summary stats)

Examples:

Called by worker

# Worker reserves watchdog job with payload:
# { "scheduler" => true, "interval" => 300 }
scheduler = Postburner::Scheduler.new(interval: 300)
scheduler.perform


138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/postburner/scheduler.rb', line 138

def perform
  # Self-deduplicate: if another watchdog exists in the queue, exit early
  # and let that one handle scheduling. This naturally resolves duplicate
  # watchdogs that can occur from race conditions.
  @skip_requeue = false
  if another_watchdog_queued?
    logger.info "[Postburner::Scheduler] Another watchdog already queued, exiting to deduplicate"
    @skip_requeue = true
    return
  end

  logger.info "[Postburner::Scheduler] Starting scheduler run"

  ActiveSupport::Notifications.instrument('perform_start.scheduler.postburner', {
    interval: interval
  })

  # Track stats for summary event
  @schedules_processed = 0
  @schedules_failed = 0
  @executions_created = 0
  @orphans_enqueued = 0
  lock_acquired = false

  # Build payload hash that will be mutated with final stats
  payload = {
    interval: interval,
    lock_acquired: nil,
    schedules_processed: nil,
    schedules_failed: nil,
    executions_created: nil,
    orphans_enqueued: nil
  }

  # Use advisory lock to coordinate multiple workers
  ActiveSupport::Notifications.instrument('perform.scheduler.postburner', payload) do
    begin
      lock_acquired = Postburner::AdvisoryLock.with_lock(AdvisoryLock::SCHEDULER_LOCK_KEY, blocking: false) do
        process_all_schedules
        true
      end

      if lock_acquired
        logger.info "[Postburner::Scheduler] Scheduler run complete"
      else
        logger.info "[Postburner::Scheduler] Could not acquire lock, skipping"
      end
    rescue ActiveRecord::ConnectionTimeoutError => e
      # This can happen if the connection pool is exhausted
      # Log cleanly and let the watchdog retry on next interval
      logger.warn "[Postburner::Scheduler] Database connection pool exhausted. Skipping scheduler run (need advisory lock), will retry on next interval"
      logger.debug "[Postburner::Scheduler] Check database.yml pool and max_connections i.e. pool >= needed connection count from web/job workers"
      logger.debug "[Postburner::Scheduler] ActiveRecord Connection timeout: #{e.message}"
      lock_acquired = false
    end

    # Update payload with final stats (mutates the hash subscribers receive)
    payload[:lock_acquired] = lock_acquired
    payload[:schedules_processed] = @schedules_processed
    payload[:schedules_failed] = @schedules_failed
    payload[:executions_created] = @executions_created
    payload[:orphans_enqueued] = @orphans_enqueued
  end
ensure
  # Re-queue watchdog for next run (unless we're deduplicating)
  requeue_watchdog unless @skip_requeue
end