Class: Wurk::Scheduled::Poller

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/scheduled.rb

Overview

Single thread that wakes on a randomized interval, drains both ZSETs, then sleeps again. Random spread prevents the cluster from dogpiling Redis at the top of each cadence.

Constant Summary collapse

INITIAL_WAIT =
10

Constants included from Component

Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE

Instance Attribute Summary collapse

Attributes included from Component

#config

Instance Method Summary collapse

Methods included from Component

#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(config) ⇒ Poller

Returns a new instance of Poller.



122
123
124
125
126
127
128
129
130
131
# File 'lib/wurk/scheduled.rb', line 122

def initialize(config)
  @config = config
  @enq = (config[:scheduled_enq] || Enq).new(config)
  @done = false
  @mutex = ::Mutex.new
  @sleeper = ::ConditionVariable.new
  @thread = nil
  @rnd = ::Random.new
  @last_cleanup_ms = 0
end

Instance Attribute Details

#rndObject

Returns the value of attribute rnd.



120
121
122
# File 'lib/wurk/scheduled.rb', line 120

def rnd
  @rnd
end

Instance Method Details

#enqueueObject

Called on every wake. Any raise inside the Enq is reported and the loop continues — a transient Redis blip must not kill the scheduler.



160
161
162
163
164
# File 'lib/wurk/scheduled.rb', line 160

def enqueue
  @enq.enqueue_jobs
rescue StandardError => e
  handle_exception(e, { context: 'scheduler' })
end

#startObject

Spawns the scheduler thread. INITIAL_WAIT delays the first sweep so a fleet-wide deploy doesn’t have every freshly-booted process hit Redis simultaneously.



136
137
138
139
140
141
142
143
144
145
# File 'lib/wurk/scheduled.rb', line 136

def start
  @thread ||= safe_thread('scheduler') do # rubocop:disable Naming/MemoizedInstanceVariableName
    initial_wait
    until @done
      enqueue
      wait
    end
    logger.info('Scheduler exiting...')
  end
end

#terminateObject

Idempotent. Wakes the sleeping thread so it observes @done and exits. Also propagates the stop signal to @enq so any in-flight drain loop short-circuits instead of running to completion.



150
151
152
153
154
155
156
# File 'lib/wurk/scheduled.rb', line 150

def terminate
  @mutex.synchronize do
    @done = true
    @enq.terminate
    @sleeper.signal
  end
end