Class: Rubino::Jobs::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/jobs/scheduler.rb

Overview

In-process cron scheduler wrapping rufus-scheduler. Owns one rufus instance per process and is exposed as a process-wide singleton via Scheduler.instance; load_all! is called once at server boot to register every enabled job. Resolves jobs from CronJobRepository, fires runs through Run::Executor, dispatches webhooks via WebhookDelivery.

Because rufus lives in-process, this scheduler does NOT survive a multi-process scale-out: each worker would run every cron tick.

Lifecycle:

scheduler = Scheduler.new
scheduler.load_all!          # on server boot
scheduler.schedule(job)      # after POST /v1/jobs
scheduler.unschedule(job_id) # after DELETE
scheduler.trigger(job_id)    # one-shot
scheduler.shutdown!

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(rufus: nil, cron_job_repository: nil, run_repository: nil, session_repository: nil, executor: nil, webhook: nil, logger: nil) ⇒ Scheduler

Returns a new instance of Scheduler.



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/rubino/jobs/scheduler.rb', line 43

def initialize(rufus: nil, cron_job_repository: nil, run_repository: nil, session_repository: nil, executor: nil,
               webhook: nil, logger: nil)
  @rufus = rufus || Rufus::Scheduler.new
  @cron_repo = cron_job_repository || CronJobRepository.new
  @run_repo = run_repository || ::Rubino::Run::Repository.new
  @session_repo = session_repository || ::Rubino::Session::Repository.new
  @executor = executor || ::Rubino::Run::Executor.new
  @webhook = webhook || WebhookDelivery.new
  @logger = logger || Rubino.logger
  @handles = {}
  @mutex = Mutex.new
end

Class Attribute Details

.instanceObject



28
29
30
# File 'lib/rubino/jobs/scheduler.rb', line 28

def instance
  @instance ||= new
end

Class Method Details

.reset!Object



34
35
36
37
38
39
40
# File 'lib/rubino/jobs/scheduler.rb', line 34

def reset!
  @instance&.shutdown!
rescue StandardError
  # best-effort during teardown
ensure
  @instance = nil
end

Instance Method Details

#load_all!Object



56
57
58
# File 'lib/rubino/jobs/scheduler.rb', line 56

def load_all!
  @cron_repo.list(include_disabled: false).each { |job| schedule(job) }
end

#resume_pending_webhooks!Object

Replays any webhook_deliveries row left in pending by a prior process. Boot-only hook; safe to call multiple times because each row’s request_id is the dedup key.



63
64
65
# File 'lib/rubino/jobs/scheduler.rb', line 63

def resume_pending_webhooks!
  @webhook.resume_pending!
end

#schedule(job) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/rubino/jobs/scheduler.rb', line 67

def schedule(job)
  return unless job[:enabled]

  unschedule(job[:id])
  handle = @rufus.cron(job[:schedule]) { fire(job[:id]) }
  @mutex.synchronize { @handles[job[:id]] = handle }
rescue ArgumentError => e
  # A persisted row with a cron string rufus/fugit cannot parse (e.g.
  # written by an older build before the API validated schedules, #164).
  # Skip it and keep going: one poison row must never abort load_all!
  # and take down server boot.
  @logger.warn(event: "cron.invalid_schedule", job_id: job[:id], schedule: job[:schedule], error: e.message)
  nil
end

#scheduled_countObject

Number of currently-registered cron handles. Reads @handles under



100
101
102
# File 'lib/rubino/jobs/scheduler.rb', line 100

def scheduled_count
  @mutex.synchronize { @handles.size }
end

#shutdown!Object



93
94
95
96
# File 'lib/rubino/jobs/scheduler.rb', line 93

def shutdown!
  @rufus.shutdown
  @mutex.synchronize { @handles.clear }
end

#trigger(job_id) ⇒ Hash?

Run the job now without waiting for the next cron tick.

Returns:

  • (Hash, nil)

    the created run row, or nil on failure / unknown job.



89
90
91
# File 'lib/rubino/jobs/scheduler.rb', line 89

def trigger(job_id)
  fire(job_id)
end

#unschedule(job_id) ⇒ Object



82
83
84
85
# File 'lib/rubino/jobs/scheduler.rb', line 82

def unschedule(job_id)
  handle = @mutex.synchronize { @handles.delete(job_id) }
  @rufus.unschedule(handle) if handle
end