Class: Pgbus::Recurring::Scheduler
Instance Attribute Summary collapse
Instance Method Summary
collapse
included, #interruptible_sleep, #process_signals, #restore_signals, #setup_signals
Constructor Details
#initialize(config: Pgbus.configuration) ⇒ Scheduler
Returns a new instance of Scheduler.
10
11
12
13
14
15
|
# File 'lib/pgbus/recurring/scheduler.rb', line 10
def initialize(config: Pgbus.configuration)
@config = config
@schedule = Schedule.new(config: config)
@shutting_down = false
@last_runs = {}
end
|
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
8
9
10
|
# File 'lib/pgbus/recurring/scheduler.rb', line 8
def config
@config
end
|
#schedule ⇒ Object
Returns the value of attribute schedule.
8
9
10
|
# File 'lib/pgbus/recurring/scheduler.rb', line 8
def schedule
@schedule
end
|
Instance Method Details
#graceful_shutdown ⇒ Object
75
76
77
|
# File 'lib/pgbus/recurring/scheduler.rb', line 75
def graceful_shutdown
@shutting_down = true
end
|
79
80
81
|
# File 'lib/pgbus/recurring/scheduler.rb', line 79
def immediate_shutdown
@shutting_down = true
end
|
#last_run_at(key) ⇒ Object
53
54
55
|
# File 'lib/pgbus/recurring/scheduler.rb', line 53
def last_run_at(key)
@last_runs[key]
end
|
#run ⇒ Object
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
# File 'lib/pgbus/recurring/scheduler.rb', line 17
def run
setup_signals
start_heartbeat
sync_recurring_tasks
Pgbus.logger.info do
"[Pgbus] Scheduler started: #{schedule.tasks.size} recurring tasks, " \
"interval=#{config.recurring_schedule_interval}s"
end
loop do
break if @shutting_down
process_signals
break if @shutting_down
tick(Time.current)
break if @shutting_down
interruptible_sleep(config.recurring_schedule_interval)
end
shutdown
end
|
#task_statuses ⇒ Object
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/pgbus/recurring/scheduler.rb', line 57
def task_statuses
schedule.tasks.map do |task|
{
key: task.key,
class_name: task.class_name,
command: task.command,
schedule: task.schedule,
human_schedule: task.human_schedule,
queue_name: task.queue_name,
arguments: task.arguments,
priority: task.priority,
description: task.description,
next_run_at: task.next_time,
last_run_at: @last_runs[task.key]
}
end
end
|
#tick(now) ⇒ Object
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/pgbus/recurring/scheduler.rb', line 42
def tick(now)
schedule.due_tasks(now).each do |task, run_at|
schedule.enqueue_task(task, run_at: run_at)
@last_runs[task.key] = now
rescue StandardError => e
Pgbus.logger.error do
"[Pgbus] Error scheduling recurring task #{task.key}: #{e.class}: #{e.message}"
end
end
end
|