Class: Wurk::Scheduled::Poller

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/scheduled.rb

Overview

Single thread that wakes on a randomized interval, drains both ZSETs, then sleeps again. Random spread prevents the cluster from dogpiling Redis at the top of each cadence.

Constant Summary collapse

INITIAL_WAIT =
10

Constants included from Component

Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE

Instance Attribute Summary collapse

Attributes included from Component

#config

Instance Method Summary collapse

Methods included from Component

#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(config) ⇒ Poller

Returns a new instance of Poller.



80
81
82
83
84
85
86
87
88
89
# File 'lib/wurk/scheduled.rb', line 80

def initialize(config)
  @config = config
  @enq = (config[:scheduled_enq] || Enq).new(config)
  @done = false
  @mutex = ::Mutex.new
  @sleeper = ::ConditionVariable.new
  @thread = nil
  @rnd = ::Random.new
  @last_cleanup_ms = 0
end

Instance Attribute Details

#rndObject

Returns the value of attribute rnd.



78
79
80
# File 'lib/wurk/scheduled.rb', line 78

def rnd
  @rnd
end

Instance Method Details

#enqueueObject

Called on every wake. Any raise inside the Enq is reported and the loop continues — a transient Redis blip must not kill the scheduler.



118
119
120
121
122
# File 'lib/wurk/scheduled.rb', line 118

def enqueue
  @enq.enqueue_jobs
rescue StandardError => e
  handle_exception(e, { context: 'scheduler' })
end

#startObject

Spawns the scheduler thread. INITIAL_WAIT delays the first sweep so a fleet-wide deploy doesn’t have every freshly-booted process hit Redis simultaneously.



94
95
96
97
98
99
100
101
102
103
# File 'lib/wurk/scheduled.rb', line 94

def start
  @thread ||= safe_thread('scheduler') do # rubocop:disable Naming/MemoizedInstanceVariableName
    initial_wait
    until @done
      enqueue
      wait
    end
    logger.info('Scheduler exiting...')
  end
end

#terminateObject

Idempotent. Wakes the sleeping thread so it observes @done and exits. Also propagates the stop signal to @enq so any in-flight drain loop short-circuits instead of running to completion.



108
109
110
111
112
113
114
# File 'lib/wurk/scheduled.rb', line 108

def terminate
  @mutex.synchronize do
    @done = true
    @enq.terminate
    @sleeper.signal
  end
end