Class: Honker::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/honker/scheduler.rb

Overview

Time-trigger scheduler. Register tasks with ‘add`; `tick` fires all boundaries that have elapsed since the last tick and enqueues the resulting jobs. `run(owner:, stop:)` drives the loop under a leader-elected advisory lock.

Constant Summary collapse

LEADER_LOCK =

Lock name used for leader election in ‘run`. Constant so all processes contending for leader share a single lock row.

"honker-scheduler"
LOCK_TTL_S =

TTL on the leader lock. Refreshed from ‘run` every HEARTBEAT_S; a leader whose refresh fails drops out of the loop so a standby can pick up without waiting the full TTL.

60
HEARTBEAT_S =

Refresh cadence. Balance: too small and every tick is a lock write; too large and a standby waits longer than necessary after a crash. Matches the Rust binding.

20
UPDATE_POLL_S =
0.05

Instance Method Summary collapse

Constructor Details

#initialize(db) ⇒ Scheduler

Returns a new instance of Scheduler.



33
34
35
# File 'lib/honker/scheduler.rb', line 33

def initialize(db)
  @db = db
end

Instance Method Details

#add(name:, queue:, cron: nil, schedule: nil, payload:, priority: 0, expires_s: nil) ⇒ Object

Register a scheduled task. ‘cron:` is kept for backward compatibility; `schedule:` is the clearer name and can hold:

  • 5-field cron

  • 6-field cron

  • ‘@every <n><unit>` like `@every 1s`

Idempotent by ‘name`; registering the same name twice replaces the previous row.

Raises:

  • (ArgumentError)


46
47
48
49
50
51
52
53
54
55
56
# File 'lib/honker/scheduler.rb', line 46

def add(name:, queue:, cron: nil, schedule: nil, payload:, priority: 0, expires_s: nil)
  expr = schedule || cron
  raise ArgumentError, "must provide cron: or schedule:" if expr.nil? || expr.empty?

  @db.db.get_first_row(
    "SELECT honker_scheduler_register(?, ?, ?, ?, ?, ?)",
    [name, queue, expr, JSON.dump(payload), priority, expires_s],
  )
  @db.mark_updated
  nil
end

#remove(name) ⇒ Object

Remove a registered task by name. Returns the count deleted (0 or 1).



60
61
62
63
64
65
# File 'lib/honker/scheduler.rb', line 60

def remove(name)
  @db.db.get_first_row(
    "SELECT honker_scheduler_unregister(?)",
    [name],
  )[0].tap { @db.mark_updated }
end

#run(owner:, stop:) ⇒ Object

Run the scheduler loop with leader election. Blocks until ‘stop` signals. `stop` is any object that responds to `call` (returning truthy to stop) — a common choice is a lambda backed by a Mutex- guarded flag, or an `AtomicBoolean`-like wrapper.

Only the process holding the ‘“honker-scheduler”` advisory lock fires. Standbys sleep 5s and retry. The leader heartbeats every 20s; if the refresh fails (returns 0), we break out of the leader loop immediately so we don’t double-fire alongside a new leader that acquired the lock after our TTL elapsed.

‘owner` distinguishes processes — typically a hostname + pid. On tick error, the lock is released before re-raising so a standby can pick up without waiting the full TTL.



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/honker/scheduler.rb', line 96

def run(owner:, stop:)
  stop_fn = normalize_stop(stop)
  until stop_fn.call
    acquired = lock_try_acquire(LEADER_LOCK, owner, LOCK_TTL_S)
    unless acquired
      wait_for_update_or_timeout(5, stop_fn)
      next
    end

    begin
      leader_loop(owner, stop_fn)
    ensure
      lock_release(LEADER_LOCK, owner)
    end
  end
  nil
end

#soonestObject

Soonest ‘next_fire_at` across all tasks, or 0 if no tasks.



78
79
80
# File 'lib/honker/scheduler.rb', line 78

def soonest
  @db.db.get_first_row("SELECT honker_scheduler_soonest()")[0]
end

#tick(now = Time.now.to_i) ⇒ Object

Fire all due boundaries at ‘now`. Returns an array of ScheduledFire — one per enqueued job.



69
70
71
72
73
74
75
# File 'lib/honker/scheduler.rb', line 69

def tick(now = Time.now.to_i)
  rows_json = @db.db.get_first_row(
    "SELECT honker_scheduler_tick(?)",
    [now],
  )[0]
  JSON.parse(rows_json).map { |r| ScheduledFire.from_row(r) }
end