Class: MiniScheduler::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/mini_scheduler/manager.rb

Defined Under Namespace

Classes: Runner

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = nil) ⇒ Manager

Returns a new instance of Manager.



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/mini_scheduler/manager.rb', line 182

def initialize(options = nil)
  @queue = options && options[:queue] || "default"
  @workers = options && options[:workers] || 1
  @redis = MiniScheduler.redis
  @random_ratio = 0.1
  unless options && options[:skip_runner]
    @runner = Runner.new(self)
    self.class.current[@queue] = self
  end

  @hostname = options && options[:hostname]
  @manager_id = SecureRandom.hex

  if options && options.key?(:enable_stats)
    @enable_stats = options[:enable_stats]
  else
    @enable_stats = !!defined?(MiniScheduler::Stat)
  end
end

Instance Attribute Details

#enable_statsObject

Returns the value of attribute enable_stats.



5
6
7
# File 'lib/mini_scheduler/manager.rb', line 5

def enable_stats
  @enable_stats
end

#queueObject

Returns the value of attribute queue.



5
6
7
# File 'lib/mini_scheduler/manager.rb', line 5

def queue
  @queue
end

#random_ratioObject

Returns the value of attribute random_ratio.



5
6
7
# File 'lib/mini_scheduler/manager.rb', line 5

def random_ratio
  @random_ratio
end

#redisObject

Returns the value of attribute redis.



5
6
7
# File 'lib/mini_scheduler/manager.rb', line 5

def redis
  @redis
end

#workersObject

Returns the value of attribute workers.



5
6
7
# File 'lib/mini_scheduler/manager.rb', line 5

def workers
  @workers
end

Class Method Details

.currentObject



202
203
204
# File 'lib/mini_scheduler/manager.rb', line 202

def self.current
  @current ||= {}
end

.discover_queuesObject



327
328
329
# File 'lib/mini_scheduler/manager.rb', line 327

def self.discover_queues
  ObjectSpace.each_object(MiniScheduler::Schedule).map(&:queue).to_set
end

.discover_schedulesObject



331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/mini_scheduler/manager.rb', line 331

def self.discover_schedules
  # hack for developemnt reloader is crazytown
  # multiple classes with same name can be in
  # object space
  unique = Set.new
  schedules = []
  ObjectSpace.each_object(MiniScheduler::Schedule) do |schedule|
    if schedule.scheduled?
      next if unique.include?(schedule.to_s)
      schedules << schedule
      unique << schedule.to_s
    end
  end
  schedules
end

.lock_key(queue) ⇒ Object



359
360
361
# File 'lib/mini_scheduler/manager.rb', line 359

def self.lock_key(queue)
  "_scheduler_lock_#{queue}_"
end

.queue_key(queue, hostname = nil) ⇒ Object



363
364
365
366
367
368
369
# File 'lib/mini_scheduler/manager.rb', line 363

def self.queue_key(queue, hostname = nil)
  if hostname
    "_scheduler_queue_#{queue}_#{hostname}_"
  else
    "_scheduler_queue_#{queue}_"
  end
end

.schedule_key(klass, hostname = nil) ⇒ Object



371
372
373
374
375
376
377
# File 'lib/mini_scheduler/manager.rb', line 371

def self.schedule_key(klass, hostname = nil)
  if hostname
    "_scheduler_#{klass}_#{hostname}"
  else
    "_scheduler_#{klass}"
  end
end

.seqObject



348
349
350
351
352
353
# File 'lib/mini_scheduler/manager.rb', line 348

def self.seq
  @mutex.synchronize do
    @i ||= 0
    @i += 1
  end
end

.without_runnerObject



178
179
180
# File 'lib/mini_scheduler/manager.rb', line 178

def self.without_runner
  self.new(skip_runner: true)
end

Instance Method Details

#blocking_tickObject



303
304
305
306
# File 'lib/mini_scheduler/manager.rb', line 303

def blocking_tick
  tick
  @runner.wait_till_done
end

#ensure_schedule!(klass) ⇒ Object



222
223
224
225
226
# File 'lib/mini_scheduler/manager.rb', line 222

def ensure_schedule!(klass)
  lock do
    schedule_info(klass).schedule!
  end
end

#get_klass(name) ⇒ Object



256
257
258
259
260
# File 'lib/mini_scheduler/manager.rb', line 256

def get_klass(name)
  name.constantize
rescue NameError
  nil
end

#hostnameObject



206
207
208
209
210
211
212
# File 'lib/mini_scheduler/manager.rb', line 206

def hostname
  @hostname ||= begin
                  `hostname`.strip
                rescue
                  "unknown"
                end
end

#identity_keyObject



355
356
357
# File 'lib/mini_scheduler/manager.rb', line 355

def identity_key
  @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}"
end

#keep_aliveObject



317
318
319
# File 'lib/mini_scheduler/manager.rb', line 317

def keep_alive
  redis.setex identity_key, keep_alive_duration, ""
end

#keep_alive_durationObject



313
314
315
# File 'lib/mini_scheduler/manager.rb', line 313

def keep_alive_duration
  60
end

#lockObject



321
322
323
324
325
# File 'lib/mini_scheduler/manager.rb', line 321

def lock
  MiniScheduler::DistributedMutex.synchronize(Manager.lock_key(queue), MiniScheduler.redis) do
    yield
  end
end

#next_run(klass) ⇒ Object



218
219
220
# File 'lib/mini_scheduler/manager.rb', line 218

def next_run(klass)
  schedule_info(klass).next_run
end

#remove(klass) ⇒ Object



228
229
230
231
232
# File 'lib/mini_scheduler/manager.rb', line 228

def remove(klass)
  lock do
    schedule_info(klass).del!
  end
end

#repair_queueObject



262
263
264
265
266
267
268
269
# File 'lib/mini_scheduler/manager.rb', line 262

def repair_queue
  return if redis.exists?(self.class.queue_key(queue)) ||
    redis.exists?(self.class.queue_key(queue, hostname))

  self.class.discover_schedules
    .select { |schedule| schedule.queue == queue }
    .each { |schedule| ensure_schedule!(schedule) }
end

#reschedule_orphans!Object



234
235
236
237
238
239
# File 'lib/mini_scheduler/manager.rb', line 234

def reschedule_orphans!
  lock do
    reschedule_orphans_on!
    reschedule_orphans_on!(hostname)
  end
end

#reschedule_orphans_on!(hostname = nil) ⇒ Object



241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/mini_scheduler/manager.rb', line 241

def reschedule_orphans_on!(hostname = nil)
  redis.zrange(Manager.queue_key(queue, hostname), 0, -1).each do |key|
    klass = get_klass(key)
    next unless klass
    info = schedule_info(klass)

    if ['QUEUED', 'RUNNING'].include?(info.prev_result) &&
      (info.current_owner.blank? || !redis.get(info.current_owner))
      info.prev_result = 'ORPHAN'
      info.next_run = Time.now.to_i
      info.write!
    end
  end
end

#schedule_info(klass) ⇒ Object



214
215
216
# File 'lib/mini_scheduler/manager.rb', line 214

def schedule_info(klass)
  MiniScheduler::ScheduleInfo.new(klass, self)
end

#schedule_next_job(hostname = nil) ⇒ Object



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/mini_scheduler/manager.rb', line 278

def schedule_next_job(hostname = nil)
  (key, due), _ = redis.zrange Manager.queue_key(queue, hostname), 0, 0, withscores: true
  return unless key

  if due.to_i <= Time.now.to_i
    klass = get_klass(key)
    if !klass || (
      (klass.is_per_host && !hostname) || (hostname && !klass.is_per_host)
    )
      # corrupt key, nuke it (renamed job or something)
      redis.zrem Manager.queue_key(queue, hostname), key
      return
    end

    info = schedule_info(klass)
    info.prev_run = Time.now.to_i
    info.prev_result = "QUEUED"
    info.prev_duration = -1
    info.next_run = nil
    info.current_owner = identity_key
    info.schedule!
    @runner.enq(klass)
  end
end

#stop!Object



308
309
310
311
# File 'lib/mini_scheduler/manager.rb', line 308

def stop!
  @runner.stop!
  self.class.current.delete(@queue)
end

#tickObject



271
272
273
274
275
276
# File 'lib/mini_scheduler/manager.rb', line 271

def tick
  lock do
    schedule_next_job
    schedule_next_job(hostname)
  end
end