Class: Rubino::Jobs::Scheduler
- Inherits:
-
Object
- Object
- Rubino::Jobs::Scheduler
- Defined in:
- lib/rubino/jobs/scheduler.rb
Overview
In-process cron scheduler wrapping rufus-scheduler. Owns one rufus instance per process and is exposed as a process-wide singleton via Scheduler.instance; load_all! is called once at server boot to register every enabled job. Resolves jobs from CronJobRepository, fires runs through Run::Executor, dispatches webhooks via WebhookDelivery.
Because rufus lives in-process, this scheduler does NOT survive a multi-process scale-out: each worker would run every cron tick.
Lifecycle:
scheduler = Scheduler.new
scheduler.load_all! # on server boot
scheduler.schedule(job) # after POST /v1/jobs
scheduler.unschedule(job_id) # after DELETE
scheduler.trigger(job_id) # one-shot
scheduler.shutdown!
Class Attribute Summary collapse
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(rufus: nil, cron_job_repository: nil, run_repository: nil, session_repository: nil, executor: nil, webhook: nil, logger: nil) ⇒ Scheduler
constructor
A new instance of Scheduler.
- #load_all! ⇒ Object
-
#resume_pending_webhooks! ⇒ Object
Replays any
webhook_deliveriesrow left inpendingby a prior process. - #schedule(job) ⇒ Object
-
#scheduled_count ⇒ Object
Number of currently-registered cron handles.
- #shutdown! ⇒ Object
-
#trigger(job_id) ⇒ Hash?
Run the job now without waiting for the next cron tick.
- #unschedule(job_id) ⇒ Object
Constructor Details
#initialize(rufus: nil, cron_job_repository: nil, run_repository: nil, session_repository: nil, executor: nil, webhook: nil, logger: nil) ⇒ Scheduler
Returns a new instance of Scheduler.
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/rubino/jobs/scheduler.rb', line 43 def initialize(rufus: nil, cron_job_repository: nil, run_repository: nil, session_repository: nil, executor: nil, webhook: nil, logger: nil) @rufus = rufus || Rufus::Scheduler.new @cron_repo = cron_job_repository || CronJobRepository.new @run_repo = run_repository || ::Rubino::Run::Repository.new @session_repo = session_repository || ::Rubino::Session::Repository.new @executor = executor || ::Rubino::Run::Executor.new @webhook = webhook || WebhookDelivery.new @logger = logger || Rubino.logger @handles = {} @mutex = Mutex.new end |
Class Attribute Details
.instance ⇒ Object
28 29 30 |
# File 'lib/rubino/jobs/scheduler.rb', line 28 def instance @instance ||= new end |
Class Method Details
.reset! ⇒ Object
34 35 36 37 38 39 40 |
# File 'lib/rubino/jobs/scheduler.rb', line 34 def reset! @instance&.shutdown! rescue StandardError # best-effort during teardown ensure @instance = nil end |
Instance Method Details
#load_all! ⇒ Object
56 57 58 |
# File 'lib/rubino/jobs/scheduler.rb', line 56 def load_all! @cron_repo.list(include_disabled: false).each { |job| schedule(job) } end |
#resume_pending_webhooks! ⇒ Object
Replays any webhook_deliveries row left in pending by a prior process. Boot-only hook; safe to call multiple times because each row’s request_id is the dedup key.
63 64 65 |
# File 'lib/rubino/jobs/scheduler.rb', line 63 def resume_pending_webhooks! @webhook.resume_pending! end |
#schedule(job) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/rubino/jobs/scheduler.rb', line 67 def schedule(job) return unless job[:enabled] unschedule(job[:id]) handle = @rufus.cron(job[:schedule]) { fire(job[:id]) } @mutex.synchronize { @handles[job[:id]] = handle } rescue ArgumentError => e # A persisted row with a cron string rufus/fugit cannot parse (e.g. # written by an older build before the API validated schedules, #164). # Skip it and keep going: one poison row must never abort load_all! # and take down server boot. @logger.warn(event: "cron.invalid_schedule", job_id: job[:id], schedule: job[:schedule], error: e.) nil end |
#scheduled_count ⇒ Object
Number of currently-registered cron handles. Reads @handles under
100 101 102 |
# File 'lib/rubino/jobs/scheduler.rb', line 100 def scheduled_count @mutex.synchronize { @handles.size } end |
#shutdown! ⇒ Object
93 94 95 96 |
# File 'lib/rubino/jobs/scheduler.rb', line 93 def shutdown! @rufus.shutdown @mutex.synchronize { @handles.clear } end |
#trigger(job_id) ⇒ Hash?
Run the job now without waiting for the next cron tick.
89 90 91 |
# File 'lib/rubino/jobs/scheduler.rb', line 89 def trigger(job_id) fire(job_id) end |
#unschedule(job_id) ⇒ Object
82 83 84 85 |
# File 'lib/rubino/jobs/scheduler.rb', line 82 def unschedule(job_id) handle = @mutex.synchronize { @handles.delete(job_id) } @rufus.unschedule(handle) if handle end |