Class: Wurk::Cron::Poller

Inherits:
Object
  • Object
show all
Includes:
Wurk::Component
Defined in:
lib/wurk/cron.rb

Overview

Once-per-minute tick driver. Only the cluster leader iterates the LoopSet and enqueues; non-leaders return early. Missed-tick warning when wall-clock has drifted more than ‘MISSED_TICK_THRESHOLD` seconds past the expected fire.

Constant Summary

Constants included from Wurk::Component

Wurk::Component::DEFAULT_THREAD_PRIORITY, Wurk::Component::PROCESS_NONCE

Instance Attribute Summary

Attributes included from Wurk::Component

#config

Instance Method Summary collapse

Methods included from Wurk::Component

#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(config) ⇒ Poller

Returns a new instance of Poller.



447
448
449
450
451
452
453
454
455
456
457
# File 'lib/wurk/cron.rb', line 447

def initialize(config)
  @config = config
  @done = false
  @mutex = ::Mutex.new
  @sleeper = ::ConditionVariable.new
  @client = Client.new(config: config)
  @thread = nil
  # Operators never need to touch this; integration tests shrink it so a
  # due loop fires within the test window instead of waiting a full minute.
  @tick_interval = config[:cron_tick_interval] || DEFAULT_TICK_SECONDS
end

Instance Method Details

#enqueue_if_due(loop_obj) ⇒ Object



491
492
493
494
495
496
497
498
499
500
501
502
503
504
# File 'lib/wurk/cron.rb', line 491

def enqueue_if_due(loop_obj)
  return if loop_obj.paused?

  now = ::Time.now.to_i
  prev_fire, next_fire = read_fire_marks(loop_obj.lid)
  next_fire ||= loop_obj.next_fire_at(prev_fire || (now - @tick_interval))
  return if next_fire.nil? || next_fire > now

  warn_missed_tick(loop_obj, next_fire, now)
  jid = enqueue!(loop_obj)
  future = loop_obj.next_fire_after(next_fire, now)
  record_fire(loop_obj, jid, now, future)
  jid
end

#fire(loop_obj) ⇒ Object

Fire one loop right now, bypassing both the leader gate and the schedule due-check, recording history + advancing the fire marks exactly like a real tick. Powers Cron.fire! (deterministic specs / manual “run now”); the scheduled, leader-gated path stays #tick.



510
511
512
513
514
515
# File 'lib/wurk/cron.rb', line 510

def fire(loop_obj)
  now = ::Time.now.to_i
  jid = enqueue!(loop_obj)
  record_fire(loop_obj, jid, now, loop_obj.next_fire_at(now))
  jid
end

#startObject



459
460
461
462
463
464
465
466
467
468
469
470
# File 'lib/wurk/cron.rb', line 459

def start
  @poller_thread ||= safe_thread('cron-poller') do # rubocop:disable Naming/MemoizedInstanceVariableName
    # Wait one interval before the first tick: don't fire a catch-up burst
    # the instant we boot (the leader is barely settled), and let a
    # short-lived process exit without ticking at all.
    wait
    until @done
      tick
      wait
    end
  end
end

#terminateObject



472
473
474
475
476
477
# File 'lib/wurk/cron.rb', line 472

def terminate
  @mutex.synchronize do
    @done = true
    @sleeper.signal
  end
end

#tickObject

Leader-gated by the single cluster lock (Component#leader? reads ‘dear-leader`): non-leaders return early and never iterate the LoopSet. The Launcher owns the lock’s renewal — the poller no longer runs (or expires) its own.



483
484
485
486
487
488
489
# File 'lib/wurk/cron.rb', line 483

def tick
  return unless leader?

  LoopSet.new(@config).each { |lp| enqueue_if_due(lp) }
rescue StandardError => e
  handle_exception(e, { context: 'cron-poller' })
end