Class: Wurk::Scheduled::Poller
- Inherits:
-
Object
- Object
- Wurk::Scheduled::Poller
- Includes:
- Component
- Defined in:
- lib/wurk/scheduled.rb
Overview
Single thread that wakes on a randomized interval, drains both ZSETs, then sleeps again. Random spread prevents the cluster from dogpiling Redis at the top of each cadence.
Constant Summary collapse
- INITIAL_WAIT =
10
Constants included from Component
Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE
Instance Attribute Summary collapse
-
#rnd ⇒ Object
Returns the value of attribute rnd.
Attributes included from Component
Instance Method Summary collapse
-
#enqueue ⇒ Object
Called on every wake.
-
#initialize(config) ⇒ Poller
constructor
A new instance of Poller.
-
#start ⇒ Object
Spawns the scheduler thread.
-
#terminate ⇒ Object
Idempotent.
Methods included from Component
#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog
Constructor Details
#initialize(config) ⇒ Poller
Returns a new instance of Poller.
80 81 82 83 84 85 86 87 88 89 |
# File 'lib/wurk/scheduled.rb', line 80 def initialize(config) @config = config @enq = (config[:scheduled_enq] || Enq).new(config) @done = false @mutex = ::Mutex.new @sleeper = ::ConditionVariable.new @thread = nil @rnd = ::Random.new @last_cleanup_ms = 0 end |
Instance Attribute Details
#rnd ⇒ Object
Returns the value of attribute rnd.
78 79 80 |
# File 'lib/wurk/scheduled.rb', line 78 def rnd @rnd end |
Instance Method Details
#enqueue ⇒ Object
Called on every wake. Any raise inside the Enq is reported and the loop continues — a transient Redis blip must not kill the scheduler.
118 119 120 121 122 |
# File 'lib/wurk/scheduled.rb', line 118 def enqueue @enq.enqueue_jobs rescue StandardError => e handle_exception(e, { context: 'scheduler' }) end |
#start ⇒ Object
Spawns the scheduler thread. INITIAL_WAIT delays the first sweep so a fleet-wide deploy doesn’t have every freshly-booted process hit Redis simultaneously.
94 95 96 97 98 99 100 101 102 103 |
# File 'lib/wurk/scheduled.rb', line 94 def start @thread ||= safe_thread('scheduler') do # rubocop:disable Naming/MemoizedInstanceVariableName initial_wait until @done enqueue wait end logger.info('Scheduler exiting...') end end |
#terminate ⇒ Object
Idempotent. Wakes the sleeping thread so it observes @done and exits. Also propagates the stop signal to @enq so any in-flight drain loop short-circuits instead of running to completion.
108 109 110 111 112 113 114 |
# File 'lib/wurk/scheduled.rb', line 108 def terminate @mutex.synchronize do @done = true @enq.terminate @sleeper.signal end end |