Class: Philiprehberger::Scheduler

Inherits:
Object
  • Object
show all
Includes:
LeaderElection, Persistence
Defined in:
lib/philiprehberger/scheduler.rb,
lib/philiprehberger/scheduler/job.rb,
lib/philiprehberger/scheduler/runner.rb,
lib/philiprehberger/scheduler/version.rb,
lib/philiprehberger/scheduler/cron_parser.rb,
lib/philiprehberger/scheduler/persistence.rb,
lib/philiprehberger/scheduler/leader_election.rb

Defined Under Namespace

Modules: LeaderElection, Persistence Classes: CronParser, Job, Runner

Constant Summary collapse

VERSION =
'0.4.0'

Instance Method Summary collapse

Methods included from LeaderElection

#acquire_leadership, #enable_leader_election, #leader?, #release_leadership

Methods included from Persistence

#load_state, #save_state

Constructor Details

#initializeScheduler

Returns a new instance of Scheduler.



15
16
17
18
19
20
21
22
23
# File 'lib/philiprehberger/scheduler.rb', line 15

def initialize
  @jobs = []
  @mutex = Mutex.new
  @error_handler = nil
  @runner = nil
  @leader_lock_path = nil
  @leader_lock_file = nil
  @is_leader = false
end

Instance Method Details

#cancel(name) ⇒ Object



94
95
96
97
98
99
100
101
102
# File 'lib/philiprehberger/scheduler.rb', line 94

def cancel(name)
  return false if name.nil?

  @mutex.synchronize do
    before = @jobs.size
    @jobs.reject! { |j| j.name == name }
    @jobs.size != before
  end
end

#cron(expression, name: nil, depends_on: nil, input_from: nil, timezone: nil, if: nil, &block) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/philiprehberger/scheduler.rb', line 45

def cron(expression, name: nil, depends_on: nil, input_from: nil, timezone: nil, if: nil, &block)
  parsed = CronParser.new(expression)
  job = Job.new(
    callable: block,
    cron: parsed,
    name: name,
    depends_on: depends_on,
    input_from: input_from,
    timezone: timezone,
    if: binding.local_variable_get(:if)
  )
  @mutex.synchronize { @jobs << job }
  job
end

#every(interval, name: nil, overlap: true, depends_on: nil, input_from: nil, if: nil, &block) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/philiprehberger/scheduler.rb', line 30

def every(interval, name: nil, overlap: true, depends_on: nil, input_from: nil, if: nil, &block)
  seconds = parse_interval(interval)
  job = Job.new(
    callable: block,
    interval: seconds,
    overlap: overlap,
    name: name,
    depends_on: depends_on,
    input_from: input_from,
    if: binding.local_variable_get(:if)
  )
  @mutex.synchronize { @jobs << job }
  job
end

#find_job(name) ⇒ Object



88
89
90
91
92
# File 'lib/philiprehberger/scheduler.rb', line 88

def find_job(name)
  return nil if name.nil?

  @mutex.synchronize { @jobs.find { |j| j.name == name } }
end

#job_countObject



84
85
86
# File 'lib/philiprehberger/scheduler.rb', line 84

def job_count
  @mutex.synchronize { @jobs.size }
end

#jobsObject



80
81
82
# File 'lib/philiprehberger/scheduler.rb', line 80

def jobs
  @mutex.synchronize { @jobs.dup }
end

#on_error(&block) ⇒ Object



25
26
27
28
# File 'lib/philiprehberger/scheduler.rb', line 25

def on_error(&block)
  @error_handler = block
  self
end

#pause(name) ⇒ Object



117
118
119
120
121
122
123
# File 'lib/philiprehberger/scheduler.rb', line 117

def pause(name)
  job = find_job(name)
  return false unless job

  job.paused = true
  true
end

#paused?(name) ⇒ Boolean

Returns:

  • (Boolean)


133
134
135
136
137
138
# File 'lib/philiprehberger/scheduler.rb', line 133

def paused?(name)
  job = find_job(name)
  return false unless job

  job.paused?
end

#resume(name) ⇒ Object



125
126
127
128
129
130
131
# File 'lib/philiprehberger/scheduler.rb', line 125

def resume(name)
  job = find_job(name)
  return false unless job

  job.paused = false
  true
end

#run_at(time, name: nil, &block) ⇒ Object

Raises:

  • (ArgumentError)


104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/philiprehberger/scheduler.rb', line 104

def run_at(time, name: nil, &block)
  raise ArgumentError, 'time must be a Time' unless time.is_a?(Time)
  raise ArgumentError, 'block required' unless block

  job = Job.new(
    callable: block,
    run_at: time,
    name: name
  )
  @mutex.synchronize { @jobs << job }
  job
end

#running?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/philiprehberger/scheduler.rb', line 76

def running?
  @runner&.running? || false
end

#startObject



60
61
62
63
64
65
66
67
68
# File 'lib/philiprehberger/scheduler.rb', line 60

def start
  if @leader_lock_path && !acquire_leadership
    return self
  end

  @runner = Runner.new(@jobs, @mutex, error_handler: @error_handler)
  @runner.start
  self
end

#stop(timeout = 5) ⇒ Object



70
71
72
73
74
# File 'lib/philiprehberger/scheduler.rb', line 70

def stop(timeout = 5)
  @runner&.stop(timeout)
  release_leadership if @leader_lock_path
  self
end