Class: Pgbus::Recurring::Scheduler

Inherits:
Object
  • Object
show all
Includes:
Process::SignalHandler
Defined in:
lib/pgbus/recurring/scheduler.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Process::SignalHandler

included, #interruptible_sleep, #process_signals, #restore_signals, #setup_signals

Constructor Details

#initialize(config: Pgbus.configuration) ⇒ Scheduler

Returns a new instance of Scheduler.



10
11
12
13
14
15
# File 'lib/pgbus/recurring/scheduler.rb', line 10

def initialize(config: Pgbus.configuration)
  @config = config
  @schedule = Schedule.new(config: config)
  @shutting_down = false
  @last_runs = {}
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



8
9
10
# File 'lib/pgbus/recurring/scheduler.rb', line 8

def config
  @config
end

#scheduleObject (readonly)

Returns the value of attribute schedule.



8
9
10
# File 'lib/pgbus/recurring/scheduler.rb', line 8

def schedule
  @schedule
end

Instance Method Details

#graceful_shutdownObject



75
76
77
# File 'lib/pgbus/recurring/scheduler.rb', line 75

def graceful_shutdown
  @shutting_down = true
end

#immediate_shutdownObject



79
80
81
# File 'lib/pgbus/recurring/scheduler.rb', line 79

def immediate_shutdown
  @shutting_down = true
end

#last_run_at(key) ⇒ Object



53
54
55
# File 'lib/pgbus/recurring/scheduler.rb', line 53

def last_run_at(key)
  @last_runs[key]
end

#runObject



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/pgbus/recurring/scheduler.rb', line 17

def run
  setup_signals
  start_heartbeat
  sync_recurring_tasks

  Pgbus.logger.info do
    "[Pgbus] Scheduler started: #{schedule.tasks.size} recurring tasks, " \
      "interval=#{config.recurring_schedule_interval}s"
  end

  loop do
    break if @shutting_down

    process_signals
    break if @shutting_down

    tick(Time.current)
    break if @shutting_down

    interruptible_sleep(config.recurring_schedule_interval)
  end

  shutdown
end

#task_statusesObject



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/pgbus/recurring/scheduler.rb', line 57

def task_statuses
  schedule.tasks.map do |task|
    {
      key: task.key,
      class_name: task.class_name,
      command: task.command,
      schedule: task.schedule,
      human_schedule: task.human_schedule,
      queue_name: task.queue_name,
      arguments: task.arguments,
      priority: task.priority,
      description: task.description,
      next_run_at: task.next_time,
      last_run_at: @last_runs[task.key]
    }
  end
end

#tick(now) ⇒ Object



42
43
44
45
46
47
48
49
50
51
# File 'lib/pgbus/recurring/scheduler.rb', line 42

def tick(now)
  schedule.due_tasks(now).each do |task, run_at|
    schedule.enqueue_task(task, run_at: run_at)
    @last_runs[task.key] = now
  rescue StandardError => e
    Pgbus.logger.error do
      "[Pgbus] Error scheduling recurring task #{task.key}: #{e.class}: #{e.message}"
    end
  end
end