Class: Wurk::Cron::Poller
- Inherits:
-
Object
- Object
- Wurk::Cron::Poller
- Includes:
- Wurk::Component
- Defined in:
- lib/wurk/cron.rb
Overview
Once-per-minute tick driver. Only the cluster leader iterates the LoopSet and enqueues; non-leaders return early. Missed-tick warning when wall-clock has drifted more than ‘MISSED_TICK_THRESHOLD` seconds past the expected fire.
Constant Summary
Constants included from Wurk::Component
Wurk::Component::DEFAULT_THREAD_PRIORITY, Wurk::Component::PROCESS_NONCE
Instance Attribute Summary
Attributes included from Wurk::Component
Instance Method Summary collapse
- #enqueue_if_due(loop_obj) ⇒ Object
-
#fire(loop_obj) ⇒ Object
Fire one loop right now, bypassing both the leader gate and the schedule due-check, recording history + advancing the fire marks exactly like a real tick.
-
#initialize(config) ⇒ Poller
constructor
A new instance of Poller.
- #start ⇒ Object
- #terminate ⇒ Object
-
#tick ⇒ Object
Leader-gated by the single cluster lock (Component#leader? reads ‘dear-leader`): non-leaders return early and never iterate the LoopSet.
Methods included from Wurk::Component
#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog
Constructor Details
#initialize(config) ⇒ Poller
Returns a new instance of Poller.
447 448 449 450 451 452 453 454 455 456 457 |
# File 'lib/wurk/cron.rb', line 447 def initialize(config) @config = config @done = false @mutex = ::Mutex.new @sleeper = ::ConditionVariable.new @client = Client.new(config: config) @thread = nil # Operators never need to touch this; integration tests shrink it so a # due loop fires within the test window instead of waiting a full minute. @tick_interval = config[:cron_tick_interval] || DEFAULT_TICK_SECONDS end |
Instance Method Details
#enqueue_if_due(loop_obj) ⇒ Object
491 492 493 494 495 496 497 498 499 500 501 502 503 504 |
# File 'lib/wurk/cron.rb', line 491 def enqueue_if_due(loop_obj) return if loop_obj.paused? now = ::Time.now.to_i prev_fire, next_fire = read_fire_marks(loop_obj.lid) next_fire ||= loop_obj.next_fire_at(prev_fire || (now - @tick_interval)) return if next_fire.nil? || next_fire > now warn_missed_tick(loop_obj, next_fire, now) jid = enqueue!(loop_obj) future = loop_obj.next_fire_after(next_fire, now) record_fire(loop_obj, jid, now, future) jid end |
#fire(loop_obj) ⇒ Object
Fire one loop right now, bypassing both the leader gate and the schedule due-check, recording history + advancing the fire marks exactly like a real tick. Powers Cron.fire! (deterministic specs / manual “run now”); the scheduled, leader-gated path stays #tick.
510 511 512 513 514 515 |
# File 'lib/wurk/cron.rb', line 510 def fire(loop_obj) now = ::Time.now.to_i jid = enqueue!(loop_obj) record_fire(loop_obj, jid, now, loop_obj.next_fire_at(now)) jid end |
#start ⇒ Object
459 460 461 462 463 464 465 466 467 468 469 470 |
# File 'lib/wurk/cron.rb', line 459 def start @poller_thread ||= safe_thread('cron-poller') do # rubocop:disable Naming/MemoizedInstanceVariableName # Wait one interval before the first tick: don't fire a catch-up burst # the instant we boot (the leader is barely settled), and let a # short-lived process exit without ticking at all. wait until @done tick wait end end end |
#terminate ⇒ Object
472 473 474 475 476 477 |
# File 'lib/wurk/cron.rb', line 472 def terminate @mutex.synchronize do @done = true @sleeper.signal end end |
#tick ⇒ Object
Leader-gated by the single cluster lock (Component#leader? reads ‘dear-leader`): non-leaders return early and never iterate the LoopSet. The Launcher owns the lock’s renewal — the poller no longer runs (or expires) its own.
483 484 485 486 487 488 489 |
# File 'lib/wurk/cron.rb', line 483 def tick return unless leader? LoopSet.new(@config).each { |lp| enqueue_if_due(lp) } rescue StandardError => e handle_exception(e, { context: 'cron-poller' }) end |