Class: Pgbus::Recurring::Scheduler
Instance Attribute Summary collapse
Instance Method Summary
collapse
included, #interruptible_sleep, #process_signals, #restore_signals, #setup_signals
Constructor Details
#initialize(config: Pgbus.configuration) ⇒ Scheduler
Returns a new instance of Scheduler.
10
11
12
13
14
15
|
# File 'lib/pgbus/recurring/scheduler.rb', line 10
def initialize(config: Pgbus.configuration)
@config = config
@schedule = Schedule.new(config: config)
@shutting_down = false
@last_runs = {}
end
|
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
8
9
10
|
# File 'lib/pgbus/recurring/scheduler.rb', line 8
def config
@config
end
|
#schedule ⇒ Object
Returns the value of attribute schedule.
8
9
10
|
# File 'lib/pgbus/recurring/scheduler.rb', line 8
def schedule
@schedule
end
|
Instance Method Details
#graceful_shutdown ⇒ Object
83
84
85
|
# File 'lib/pgbus/recurring/scheduler.rb', line 83
def graceful_shutdown
@shutting_down = true
end
|
87
88
89
|
# File 'lib/pgbus/recurring/scheduler.rb', line 87
def immediate_shutdown
@shutting_down = true
end
|
#last_run_at(key) ⇒ Object
61
62
63
|
# File 'lib/pgbus/recurring/scheduler.rb', line 61
def last_run_at(key)
@last_runs[key]
end
|
#run ⇒ Object
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
# File 'lib/pgbus/recurring/scheduler.rb', line 17
def run
setup_signals
start_heartbeat
sync_recurring_tasks
Pgbus.logger.info do
"[Pgbus] Scheduler started: #{schedule.tasks.size} recurring tasks, " \
"interval=#{config.recurring_schedule_interval}s"
end
loop do
break if @shutting_down
process_signals
break if @shutting_down
tick(Time.current)
break if @shutting_down
interruptible_sleep(config.recurring_schedule_interval)
end
shutdown
end
|
#task_statuses ⇒ Object
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
# File 'lib/pgbus/recurring/scheduler.rb', line 65
def task_statuses
schedule.tasks.map do |task|
{
key: task.key,
class_name: task.class_name,
command: task.command,
schedule: task.schedule,
human_schedule: task.human_schedule,
queue_name: task.queue_name,
arguments: task.arguments,
priority: task.priority,
description: task.description,
next_run_at: task.next_time,
last_run_at: @last_runs[task.key]
}
end
end
|
#tick(now) ⇒ Object
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# File 'lib/pgbus/recurring/scheduler.rb', line 42
def tick(now)
schedule.due_tasks(now).each do |task, run_at|
Pgbus::Instrumentation.instrument(
"pgbus.recurring.enqueue",
task: task.key,
class_name: task.class_name,
queue: task.queue_name,
run_at: run_at
) do
schedule.enqueue_task(task, run_at: run_at)
@last_runs[task.key] = now
end
rescue StandardError => e
Pgbus.logger.error do
"[Pgbus] Error scheduling recurring task #{task.key}: #{e.class}: #{e.message}"
end
end
end
|