Class: Honker::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/honker/scheduler.rb

Overview

Cron-style scheduler. Register tasks with ‘add`; `tick` fires all boundaries that have elapsed since the last tick and enqueues the resulting jobs. `run(owner:, stop:)` drives the loop under a leader-elected advisory lock.

Constant Summary collapse

LEADER_LOCK =

Lock name used for leader election in ‘run`. Constant so all processes contending for leader share a single lock row.

"honker-scheduler"
LOCK_TTL_S =

TTL on the leader lock. Refreshed from ‘run` every HEARTBEAT_S; a leader whose refresh fails drops out of the loop so a standby can pick up without waiting the full TTL.

60
HEARTBEAT_S =

Refresh cadence. Balance: too small and every tick is a lock write; too large and a standby waits longer than necessary after a crash. Matches the Rust binding.

20

Instance Method Summary collapse

Constructor Details

#initialize(db) ⇒ Scheduler

Returns a new instance of Scheduler.



32
33
34
# File 'lib/honker/scheduler.rb', line 32

def initialize(db)
  @db = db
end

Instance Method Details

#add(name:, queue:, cron:, payload:, priority: 0, expires_s: nil) ⇒ Object

Register a cron-scheduled task. Idempotent by ‘name`; registering the same name twice replaces the previous row.



38
39
40
41
42
43
44
# File 'lib/honker/scheduler.rb', line 38

def add(name:, queue:, cron:, payload:, priority: 0, expires_s: nil)
  @db.db.get_first_row(
    "SELECT honker_scheduler_register(?, ?, ?, ?, ?, ?)",
    [name, queue, cron, JSON.dump(payload), priority, expires_s],
  )
  nil
end

#remove(name) ⇒ Object

Remove a registered task by name. Returns the count deleted (0 or 1).



48
49
50
51
52
53
# File 'lib/honker/scheduler.rb', line 48

def remove(name)
  @db.db.get_first_row(
    "SELECT honker_scheduler_unregister(?)",
    [name],
  )[0]
end

#run(owner:, stop:) ⇒ Object

Run the scheduler loop with leader election. Blocks until ‘stop` signals. `stop` is any object that responds to `call` (returning truthy to stop) — a common choice is a lambda backed by a Mutex- guarded flag, or an `AtomicBoolean`-like wrapper.

Only the process holding the ‘“honker-scheduler”` advisory lock fires. Standbys sleep 5s and retry. The leader heartbeats every 20s; if the refresh fails (returns 0), we break out of the leader loop immediately so we don’t double-fire alongside a new leader that acquired the lock after our TTL elapsed.

‘owner` distinguishes processes — typically a hostname + pid. On tick error, the lock is released before re-raising so a standby can pick up without waiting the full TTL.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/honker/scheduler.rb', line 84

def run(owner:, stop:)
  stop_fn = normalize_stop(stop)
  until stop_fn.call
    acquired = lock_try_acquire(LEADER_LOCK, owner, LOCK_TTL_S)
    unless acquired
      sleep_with_stop(5, stop_fn)
      next
    end

    begin
      leader_loop(owner, stop_fn)
    ensure
      lock_release(LEADER_LOCK, owner)
    end
  end
  nil
end

#soonestObject

Soonest ‘next_fire_at` across all tasks, or 0 if no tasks.



66
67
68
# File 'lib/honker/scheduler.rb', line 66

def soonest
  @db.db.get_first_row("SELECT honker_scheduler_soonest()")[0]
end

#tick(now = Time.now.to_i) ⇒ Object

Fire all due boundaries at ‘now`. Returns an array of ScheduledFire — one per enqueued job.



57
58
59
60
61
62
63
# File 'lib/honker/scheduler.rb', line 57

def tick(now = Time.now.to_i)
  rows_json = @db.db.get_first_row(
    "SELECT honker_scheduler_tick(?)",
    [now],
  )[0]
  JSON.parse(rows_json).map { |r| ScheduledFire.from_row(r) }
end