Class: Philiprehberger::Scheduler

Inherits:
Object
  • Object
show all
Includes:
LeaderElection, Persistence
Defined in:
lib/philiprehberger/scheduler.rb,
lib/philiprehberger/scheduler/job.rb,
lib/philiprehberger/scheduler/runner.rb,
lib/philiprehberger/scheduler/version.rb,
lib/philiprehberger/scheduler/cron_parser.rb,
lib/philiprehberger/scheduler/persistence.rb,
lib/philiprehberger/scheduler/leader_election.rb

Defined Under Namespace

Modules: LeaderElection, Persistence Classes: CronParser, Job, Runner

Constant Summary collapse

VERSION =
'0.5.0'

Instance Method Summary collapse

Methods included from LeaderElection

#acquire_leadership, #enable_leader_election, #leader?, #release_leadership

Methods included from Persistence

#load_state, #save_state

Constructor Details

#initializeScheduler

Returns a new instance of Scheduler.



15
16
17
18
19
20
21
22
23
# File 'lib/philiprehberger/scheduler.rb', line 15

def initialize
  @jobs = []
  @mutex = Mutex.new
  @error_handler = nil
  @runner = nil
  @leader_lock_path = nil
  @leader_lock_file = nil
  @is_leader = false
end

Instance Method Details

#cancel(name) ⇒ Object



94
95
96
97
98
99
100
101
102
# File 'lib/philiprehberger/scheduler.rb', line 94

def cancel(name)
  return false if name.nil?

  @mutex.synchronize do
    before = @jobs.size
    @jobs.reject! { |j| j.name == name }
    @jobs.size != before
  end
end

#cron(expression, name: nil, depends_on: nil, input_from: nil, timezone: nil, if: nil, &block) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/philiprehberger/scheduler.rb', line 45

def cron(expression, name: nil, depends_on: nil, input_from: nil, timezone: nil, if: nil, &block)
  parsed = CronParser.new(expression)
  job = Job.new(
    callable: block,
    cron: parsed,
    name: name,
    depends_on: depends_on,
    input_from: input_from,
    timezone: timezone,
    if: binding.local_variable_get(:if)
  )
  @mutex.synchronize { @jobs << job }
  job
end

#every(interval, name: nil, overlap: true, depends_on: nil, input_from: nil, if: nil, &block) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/philiprehberger/scheduler.rb', line 30

def every(interval, name: nil, overlap: true, depends_on: nil, input_from: nil, if: nil, &block)
  seconds = parse_interval(interval)
  job = Job.new(
    callable: block,
    interval: seconds,
    overlap: overlap,
    name: name,
    depends_on: depends_on,
    input_from: input_from,
    if: binding.local_variable_get(:if)
  )
  @mutex.synchronize { @jobs << job }
  job
end

#find_job(name) ⇒ Object



88
89
90
91
92
# File 'lib/philiprehberger/scheduler.rb', line 88

def find_job(name)
  return nil if name.nil?

  @mutex.synchronize { @jobs.find { |j| j.name == name } }
end

#job_countObject



84
85
86
# File 'lib/philiprehberger/scheduler.rb', line 84

def job_count
  @mutex.synchronize { @jobs.size }
end

#jobsObject



80
81
82
# File 'lib/philiprehberger/scheduler.rb', line 80

def jobs
  @mutex.synchronize { @jobs.dup }
end

#next_runs(limit: 10, from: Time.now) ⇒ Object

Return an array of upcoming runs across all scheduled jobs, sorted by :next_run_at ascending. Each entry is a hash:

{ job_id:, job_name:, next_run_at: }

Jobs that will never fire again after from are excluded. When limit is nil, returns every upcoming run.



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/philiprehberger/scheduler.rb', line 146

def next_runs(limit: 10, from: Time.now)
  jobs_snapshot = @mutex.synchronize { @jobs.dup }

  entries = jobs_snapshot.filter_map do |job|
    next_at = job.next_run_at(from)
    next nil unless next_at

    {
      job_id: job.object_id,
      job_name: job.name,
      next_run_at: next_at
    }
  end

  sorted = entries.sort_by { |entry| entry[:next_run_at] }
  limit.nil? ? sorted : sorted.first(limit)
end

#on_error(&block) ⇒ Object



25
26
27
28
# File 'lib/philiprehberger/scheduler.rb', line 25

def on_error(&block)
  @error_handler = block
  self
end

#pause(name) ⇒ Object



117
118
119
120
121
122
123
# File 'lib/philiprehberger/scheduler.rb', line 117

def pause(name)
  job = find_job(name)
  return false unless job

  job.paused = true
  true
end

#paused?(name) ⇒ Boolean

Returns:

  • (Boolean)


133
134
135
136
137
138
# File 'lib/philiprehberger/scheduler.rb', line 133

def paused?(name)
  job = find_job(name)
  return false unless job

  job.paused?
end

#resume(name) ⇒ Object



125
126
127
128
129
130
131
# File 'lib/philiprehberger/scheduler.rb', line 125

def resume(name)
  job = find_job(name)
  return false unless job

  job.paused = false
  true
end

#run_at(time, name: nil, &block) ⇒ Object

Raises:

  • (ArgumentError)


104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/philiprehberger/scheduler.rb', line 104

def run_at(time, name: nil, &block)
  raise ArgumentError, 'time must be a Time' unless time.is_a?(Time)
  raise ArgumentError, 'block required' unless block

  job = Job.new(
    callable: block,
    run_at: time,
    name: name
  )
  @mutex.synchronize { @jobs << job }
  job
end

#running?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/philiprehberger/scheduler.rb', line 76

def running?
  @runner&.running? || false
end

#startObject



60
61
62
63
64
65
66
67
68
# File 'lib/philiprehberger/scheduler.rb', line 60

def start
  if @leader_lock_path && !acquire_leadership
    return self
  end

  @runner = Runner.new(@jobs, @mutex, error_handler: @error_handler)
  @runner.start
  self
end

#stop(timeout = 5) ⇒ Object



70
71
72
73
74
# File 'lib/philiprehberger/scheduler.rb', line 70

def stop(timeout = 5)
  @runner&.stop(timeout)
  release_leadership if @leader_lock_path
  self
end