Class: MiniScheduler::Manager
- Inherits:
-
Object
- Object
- MiniScheduler::Manager
- Defined in:
- lib/mini_scheduler/manager.rb
Defined Under Namespace
Classes: Runner
Instance Attribute Summary collapse
-
#enable_stats ⇒ Object
Returns the value of attribute enable_stats.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#random_ratio ⇒ Object
Returns the value of attribute random_ratio.
-
#redis ⇒ Object
Returns the value of attribute redis.
-
#workers ⇒ Object
Returns the value of attribute workers.
Class Method Summary collapse
- .current ⇒ Object
- .discover_queues ⇒ Object
- .discover_schedules ⇒ Object
- .lock_key(queue) ⇒ Object
- .queue_key(queue, hostname = nil) ⇒ Object
- .schedule_key(klass, hostname = nil) ⇒ Object
- .seq ⇒ Object
- .without_runner ⇒ Object
Instance Method Summary collapse
- #blocking_tick ⇒ Object
- #ensure_schedule!(klass) ⇒ Object
- #get_klass(name) ⇒ Object
- #hostname ⇒ Object
- #identity_key ⇒ Object
-
#initialize(options = nil) ⇒ Manager
constructor
A new instance of Manager.
- #keep_alive ⇒ Object
- #keep_alive_duration ⇒ Object
- #lock ⇒ Object
- #next_run(klass) ⇒ Object
- #remove(klass) ⇒ Object
- #repair_queue ⇒ Object
- #reschedule_orphans! ⇒ Object
- #reschedule_orphans_on!(hostname = nil) ⇒ Object
- #schedule_info(klass) ⇒ Object
- #schedule_next_job(hostname = nil) ⇒ Object
- #stop! ⇒ Object
- #tick ⇒ Object
Constructor Details
#initialize(options = nil) ⇒ Manager
Returns a new instance of Manager.
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/mini_scheduler/manager.rb', line 182 def initialize( = nil) @queue = && [:queue] || "default" @workers = && [:workers] || 1 @redis = MiniScheduler.redis @random_ratio = 0.1 unless && [:skip_runner] @runner = Runner.new(self) self.class.current[@queue] = self end @hostname = && [:hostname] @manager_id = SecureRandom.hex if && .key?(:enable_stats) @enable_stats = [:enable_stats] else @enable_stats = !!defined?(MiniScheduler::Stat) end end |
Instance Attribute Details
#enable_stats ⇒ Object
Returns the value of attribute enable_stats.
5 6 7 |
# File 'lib/mini_scheduler/manager.rb', line 5 def enable_stats @enable_stats end |
#queue ⇒ Object
Returns the value of attribute queue.
5 6 7 |
# File 'lib/mini_scheduler/manager.rb', line 5 def queue @queue end |
#random_ratio ⇒ Object
Returns the value of attribute random_ratio.
5 6 7 |
# File 'lib/mini_scheduler/manager.rb', line 5 def random_ratio @random_ratio end |
#redis ⇒ Object
Returns the value of attribute redis.
5 6 7 |
# File 'lib/mini_scheduler/manager.rb', line 5 def redis @redis end |
#workers ⇒ Object
Returns the value of attribute workers.
5 6 7 |
# File 'lib/mini_scheduler/manager.rb', line 5 def workers @workers end |
Class Method Details
.current ⇒ Object
202 203 204 |
# File 'lib/mini_scheduler/manager.rb', line 202 def self.current @current ||= {} end |
.discover_queues ⇒ Object
327 328 329 |
# File 'lib/mini_scheduler/manager.rb', line 327 def self.discover_queues ObjectSpace.each_object(MiniScheduler::Schedule).map(&:queue).to_set end |
.discover_schedules ⇒ Object
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 |
# File 'lib/mini_scheduler/manager.rb', line 331 def self.discover_schedules # hack for developemnt reloader is crazytown # multiple classes with same name can be in # object space unique = Set.new schedules = [] ObjectSpace.each_object(MiniScheduler::Schedule) do |schedule| if schedule.scheduled? next if unique.include?(schedule.to_s) schedules << schedule unique << schedule.to_s end end schedules end |
.lock_key(queue) ⇒ Object
359 360 361 |
# File 'lib/mini_scheduler/manager.rb', line 359 def self.lock_key(queue) "_scheduler_lock_#{queue}_" end |
.queue_key(queue, hostname = nil) ⇒ Object
363 364 365 366 367 368 369 |
# File 'lib/mini_scheduler/manager.rb', line 363 def self.queue_key(queue, hostname = nil) if hostname "_scheduler_queue_#{queue}_#{hostname}_" else "_scheduler_queue_#{queue}_" end end |
.schedule_key(klass, hostname = nil) ⇒ Object
371 372 373 374 375 376 377 |
# File 'lib/mini_scheduler/manager.rb', line 371 def self.schedule_key(klass, hostname = nil) if hostname "_scheduler_#{klass}_#{hostname}" else "_scheduler_#{klass}" end end |
.seq ⇒ Object
348 349 350 351 352 353 |
# File 'lib/mini_scheduler/manager.rb', line 348 def self.seq @mutex.synchronize do @i ||= 0 @i += 1 end end |
.without_runner ⇒ Object
178 179 180 |
# File 'lib/mini_scheduler/manager.rb', line 178 def self.without_runner self.new(skip_runner: true) end |
Instance Method Details
#blocking_tick ⇒ Object
303 304 305 306 |
# File 'lib/mini_scheduler/manager.rb', line 303 def blocking_tick tick @runner.wait_till_done end |
#ensure_schedule!(klass) ⇒ Object
222 223 224 225 226 |
# File 'lib/mini_scheduler/manager.rb', line 222 def ensure_schedule!(klass) lock do schedule_info(klass).schedule! end end |
#get_klass(name) ⇒ Object
256 257 258 259 260 |
# File 'lib/mini_scheduler/manager.rb', line 256 def get_klass(name) name.constantize rescue NameError nil end |
#hostname ⇒ Object
206 207 208 209 210 211 212 |
# File 'lib/mini_scheduler/manager.rb', line 206 def hostname @hostname ||= begin `hostname`.strip rescue "unknown" end end |
#identity_key ⇒ Object
355 356 357 |
# File 'lib/mini_scheduler/manager.rb', line 355 def identity_key @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}" end |
#keep_alive ⇒ Object
317 318 319 |
# File 'lib/mini_scheduler/manager.rb', line 317 def keep_alive redis.setex identity_key, keep_alive_duration, "" end |
#keep_alive_duration ⇒ Object
313 314 315 |
# File 'lib/mini_scheduler/manager.rb', line 313 def keep_alive_duration 60 end |
#lock ⇒ Object
321 322 323 324 325 |
# File 'lib/mini_scheduler/manager.rb', line 321 def lock MiniScheduler::DistributedMutex.synchronize(Manager.lock_key(queue), MiniScheduler.redis) do yield end end |
#next_run(klass) ⇒ Object
218 219 220 |
# File 'lib/mini_scheduler/manager.rb', line 218 def next_run(klass) schedule_info(klass).next_run end |
#remove(klass) ⇒ Object
228 229 230 231 232 |
# File 'lib/mini_scheduler/manager.rb', line 228 def remove(klass) lock do schedule_info(klass).del! end end |
#repair_queue ⇒ Object
262 263 264 265 266 267 268 269 |
# File 'lib/mini_scheduler/manager.rb', line 262 def repair_queue return if redis.exists?(self.class.queue_key(queue)) || redis.exists?(self.class.queue_key(queue, hostname)) self.class.discover_schedules .select { |schedule| schedule.queue == queue } .each { |schedule| ensure_schedule!(schedule) } end |
#reschedule_orphans! ⇒ Object
234 235 236 237 238 239 |
# File 'lib/mini_scheduler/manager.rb', line 234 def reschedule_orphans! lock do reschedule_orphans_on! reschedule_orphans_on!(hostname) end end |
#reschedule_orphans_on!(hostname = nil) ⇒ Object
241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/mini_scheduler/manager.rb', line 241 def reschedule_orphans_on!(hostname = nil) redis.zrange(Manager.queue_key(queue, hostname), 0, -1).each do |key| klass = get_klass(key) next unless klass info = schedule_info(klass) if ['QUEUED', 'RUNNING'].include?(info.prev_result) && (info.current_owner.blank? || !redis.get(info.current_owner)) info.prev_result = 'ORPHAN' info.next_run = Time.now.to_i info.write! end end end |
#schedule_info(klass) ⇒ Object
214 215 216 |
# File 'lib/mini_scheduler/manager.rb', line 214 def schedule_info(klass) MiniScheduler::ScheduleInfo.new(klass, self) end |
#schedule_next_job(hostname = nil) ⇒ Object
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/mini_scheduler/manager.rb', line 278 def schedule_next_job(hostname = nil) (key, due), _ = redis.zrange Manager.queue_key(queue, hostname), 0, 0, withscores: true return unless key if due.to_i <= Time.now.to_i klass = get_klass(key) if !klass || ( (klass.is_per_host && !hostname) || (hostname && !klass.is_per_host) ) # corrupt key, nuke it (renamed job or something) redis.zrem Manager.queue_key(queue, hostname), key return end info = schedule_info(klass) info.prev_run = Time.now.to_i info.prev_result = "QUEUED" info.prev_duration = -1 info.next_run = nil info.current_owner = identity_key info.schedule! @runner.enq(klass) end end |
#stop! ⇒ Object
308 309 310 311 |
# File 'lib/mini_scheduler/manager.rb', line 308 def stop! @runner.stop! self.class.current.delete(@queue) end |
#tick ⇒ Object
271 272 273 274 275 276 |
# File 'lib/mini_scheduler/manager.rb', line 271 def tick lock do schedule_next_job schedule_next_job(hostname) end end |