Class: Philiprehberger::Scheduler
- Inherits:
-
Object
- Object
- Philiprehberger::Scheduler
show all
- Includes:
- LeaderElection, Persistence
- Defined in:
- lib/philiprehberger/scheduler.rb,
lib/philiprehberger/scheduler/job.rb,
lib/philiprehberger/scheduler/runner.rb,
lib/philiprehberger/scheduler/version.rb,
lib/philiprehberger/scheduler/cron_parser.rb,
lib/philiprehberger/scheduler/persistence.rb,
lib/philiprehberger/scheduler/leader_election.rb
Defined Under Namespace
Modules: LeaderElection, Persistence
Classes: CronParser, Job, Runner
Constant Summary
collapse
- VERSION =
'0.5.0'
Instance Method Summary
collapse
-
#cancel(name) ⇒ Object
-
#cron(expression, name: nil, depends_on: nil, input_from: nil, timezone: nil, if: nil, &block) ⇒ Object
-
#every(interval, name: nil, overlap: true, depends_on: nil, input_from: nil, if: nil, &block) ⇒ Object
-
#find_job(name) ⇒ Object
-
#initialize ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#job_count ⇒ Object
-
#jobs ⇒ Object
-
#next_runs(limit: 10, from: Time.now) ⇒ Object
Return an array of upcoming runs across all scheduled jobs, sorted by :next_run_at ascending.
-
#on_error(&block) ⇒ Object
-
#pause(name) ⇒ Object
-
#paused?(name) ⇒ Boolean
-
#resume(name) ⇒ Object
-
#run_at(time, name: nil, &block) ⇒ Object
-
#running? ⇒ Boolean
-
#start ⇒ Object
-
#stop(timeout = 5) ⇒ Object
#acquire_leadership, #enable_leader_election, #leader?, #release_leadership
#load_state, #save_state
Constructor Details
Returns a new instance of Scheduler.
15
16
17
18
19
20
21
22
23
|
# File 'lib/philiprehberger/scheduler.rb', line 15
def initialize
@jobs = []
@mutex = Mutex.new
@error_handler = nil
@runner = nil
@leader_lock_path = nil
@leader_lock_file = nil
@is_leader = false
end
|
Instance Method Details
#cancel(name) ⇒ Object
94
95
96
97
98
99
100
101
102
|
# File 'lib/philiprehberger/scheduler.rb', line 94
def cancel(name)
return false if name.nil?
@mutex.synchronize do
before = @jobs.size
@jobs.reject! { |j| j.name == name }
@jobs.size != before
end
end
|
#cron(expression, name: nil, depends_on: nil, input_from: nil, timezone: nil, if: nil, &block) ⇒ Object
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
# File 'lib/philiprehberger/scheduler.rb', line 45
def cron(expression, name: nil, depends_on: nil, input_from: nil, timezone: nil, if: nil, &block)
parsed = CronParser.new(expression)
job = Job.new(
callable: block,
cron: parsed,
name: name,
depends_on: depends_on,
input_from: input_from,
timezone: timezone,
if: binding.local_variable_get(:if)
)
@mutex.synchronize { @jobs << job }
job
end
|
#every(interval, name: nil, overlap: true, depends_on: nil, input_from: nil, if: nil, &block) ⇒ Object
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
# File 'lib/philiprehberger/scheduler.rb', line 30
def every(interval, name: nil, overlap: true, depends_on: nil, input_from: nil, if: nil, &block)
seconds = parse_interval(interval)
job = Job.new(
callable: block,
interval: seconds,
overlap: overlap,
name: name,
depends_on: depends_on,
input_from: input_from,
if: binding.local_variable_get(:if)
)
@mutex.synchronize { @jobs << job }
job
end
|
#find_job(name) ⇒ Object
88
89
90
91
92
|
# File 'lib/philiprehberger/scheduler.rb', line 88
def find_job(name)
return nil if name.nil?
@mutex.synchronize { @jobs.find { |j| j.name == name } }
end
|
#job_count ⇒ Object
84
85
86
|
# File 'lib/philiprehberger/scheduler.rb', line 84
def job_count
@mutex.synchronize { @jobs.size }
end
|
#jobs ⇒ Object
80
81
82
|
# File 'lib/philiprehberger/scheduler.rb', line 80
def jobs
@mutex.synchronize { @jobs.dup }
end
|
#next_runs(limit: 10, from: Time.now) ⇒ Object
Return an array of upcoming runs across all scheduled jobs, sorted by :next_run_at ascending. Each entry is a hash:
{ job_id:, job_name:, next_run_at: }
Jobs that will never fire again after from are excluded. When limit is nil, returns every upcoming run.
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
|
# File 'lib/philiprehberger/scheduler.rb', line 146
def next_runs(limit: 10, from: Time.now)
jobs_snapshot = @mutex.synchronize { @jobs.dup }
entries = jobs_snapshot.filter_map do |job|
next_at = job.next_run_at(from)
next nil unless next_at
{
job_id: job.object_id,
job_name: job.name,
next_run_at: next_at
}
end
sorted = entries.sort_by { |entry| entry[:next_run_at] }
limit.nil? ? sorted : sorted.first(limit)
end
|
#on_error(&block) ⇒ Object
25
26
27
28
|
# File 'lib/philiprehberger/scheduler.rb', line 25
def on_error(&block)
@error_handler = block
self
end
|
#pause(name) ⇒ Object
117
118
119
120
121
122
123
|
# File 'lib/philiprehberger/scheduler.rb', line 117
def pause(name)
job = find_job(name)
return false unless job
job.paused = true
true
end
|
#paused?(name) ⇒ Boolean
133
134
135
136
137
138
|
# File 'lib/philiprehberger/scheduler.rb', line 133
def paused?(name)
job = find_job(name)
return false unless job
job.paused?
end
|
#resume(name) ⇒ Object
125
126
127
128
129
130
131
|
# File 'lib/philiprehberger/scheduler.rb', line 125
def resume(name)
job = find_job(name)
return false unless job
job.paused = false
true
end
|
#run_at(time, name: nil, &block) ⇒ Object
104
105
106
107
108
109
110
111
112
113
114
115
|
# File 'lib/philiprehberger/scheduler.rb', line 104
def run_at(time, name: nil, &block)
raise ArgumentError, 'time must be a Time' unless time.is_a?(Time)
raise ArgumentError, 'block required' unless block
job = Job.new(
callable: block,
run_at: time,
name: name
)
@mutex.synchronize { @jobs << job }
job
end
|
#running? ⇒ Boolean
76
77
78
|
# File 'lib/philiprehberger/scheduler.rb', line 76
def running?
@runner&.running? || false
end
|
#start ⇒ Object
60
61
62
63
64
65
66
67
68
|
# File 'lib/philiprehberger/scheduler.rb', line 60
def start
if @leader_lock_path && !acquire_leadership
return self
end
@runner = Runner.new(@jobs, @mutex, error_handler: @error_handler)
@runner.start
self
end
|
#stop(timeout = 5) ⇒ Object
70
71
72
73
74
|
# File 'lib/philiprehberger/scheduler.rb', line 70
def stop(timeout = 5)
@runner&.stop(timeout)
release_leadership if @leader_lock_path
self
end
|