Class: Honker::Scheduler
- Inherits:
-
Object
- Object
- Honker::Scheduler
- Defined in:
- lib/honker/scheduler.rb
Overview
Cron-style scheduler. Register tasks with ‘add`; `tick` fires all boundaries that have elapsed since the last tick and enqueues the resulting jobs. `run(owner:, stop:)` drives the loop under a leader-elected advisory lock.
Constant Summary collapse
- LEADER_LOCK =
Lock name used for leader election in ‘run`. Constant so all processes contending for leader share a single lock row.
"honker-scheduler"- LOCK_TTL_S =
TTL on the leader lock. Refreshed from ‘run` every HEARTBEAT_S; a leader whose refresh fails drops out of the loop so a standby can pick up without waiting the full TTL.
60- HEARTBEAT_S =
Refresh cadence. Balance: too small and every tick is a lock write; too large and a standby waits longer than necessary after a crash. Matches the Rust binding.
20
Instance Method Summary collapse
-
#add(name:, queue:, cron:, payload:, priority: 0, expires_s: nil) ⇒ Object
Register a cron-scheduled task.
-
#initialize(db) ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#remove(name) ⇒ Object
Remove a registered task by name.
-
#run(owner:, stop:) ⇒ Object
Run the scheduler loop with leader election.
-
#soonest ⇒ Object
Soonest ‘next_fire_at` across all tasks, or 0 if no tasks.
-
#tick(now = Time.now.to_i) ⇒ Object
Fire all due boundaries at ‘now`.
Constructor Details
#initialize(db) ⇒ Scheduler
Returns a new instance of Scheduler.
32 33 34 |
# File 'lib/honker/scheduler.rb', line 32 def initialize(db) @db = db end |
Instance Method Details
#add(name:, queue:, cron:, payload:, priority: 0, expires_s: nil) ⇒ Object
Register a cron-scheduled task. Idempotent by ‘name`; registering the same name twice replaces the previous row.
38 39 40 41 42 43 44 |
# File 'lib/honker/scheduler.rb', line 38 def add(name:, queue:, cron:, payload:, priority: 0, expires_s: nil) @db.db.get_first_row( "SELECT honker_scheduler_register(?, ?, ?, ?, ?, ?)", [name, queue, cron, JSON.dump(payload), priority, expires_s], ) nil end |
#remove(name) ⇒ Object
Remove a registered task by name. Returns the count deleted (0 or 1).
48 49 50 51 52 53 |
# File 'lib/honker/scheduler.rb', line 48 def remove(name) @db.db.get_first_row( "SELECT honker_scheduler_unregister(?)", [name], )[0] end |
#run(owner:, stop:) ⇒ Object
Run the scheduler loop with leader election. Blocks until ‘stop` signals. `stop` is any object that responds to `call` (returning truthy to stop) — a common choice is a lambda backed by a Mutex- guarded flag, or an `AtomicBoolean`-like wrapper.
Only the process holding the ‘“honker-scheduler”` advisory lock fires. Standbys sleep 5s and retry. The leader heartbeats every 20s; if the refresh fails (returns 0), we break out of the leader loop immediately so we don’t double-fire alongside a new leader that acquired the lock after our TTL elapsed.
‘owner` distinguishes processes — typically a hostname + pid. On tick error, the lock is released before re-raising so a standby can pick up without waiting the full TTL.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/honker/scheduler.rb', line 84 def run(owner:, stop:) stop_fn = normalize_stop(stop) until stop_fn.call acquired = lock_try_acquire(LEADER_LOCK, owner, LOCK_TTL_S) unless acquired sleep_with_stop(5, stop_fn) next end begin leader_loop(owner, stop_fn) ensure lock_release(LEADER_LOCK, owner) end end nil end |
#soonest ⇒ Object
Soonest ‘next_fire_at` across all tasks, or 0 if no tasks.
66 67 68 |
# File 'lib/honker/scheduler.rb', line 66 def soonest @db.db.get_first_row("SELECT honker_scheduler_soonest()")[0] end |
#tick(now = Time.now.to_i) ⇒ Object
Fire all due boundaries at ‘now`. Returns an array of ScheduledFire — one per enqueued job.
57 58 59 60 61 62 63 |
# File 'lib/honker/scheduler.rb', line 57 def tick(now = Time.now.to_i) rows_json = @db.db.get_first_row( "SELECT honker_scheduler_tick(?)", [now], )[0] JSON.parse(rows_json).map { |r| ScheduledFire.from_row(r) } end |