Class: Cosmo::API::Cron
- Inherits:
-
Object
- Object
- Cosmo::API::Cron
- Defined in:
- lib/cosmo/api/cron.rb,
lib/cosmo/api/cron/entry.rb
Overview
Web-facing API for cron schedules. Single interface for all cron NATS operations.
Derives the schedule list entirely from NATS. Whatever is deployed in NATS is exactly what appears in the UI.
Schedule templates live in the same job stream they target (e.g. default), stored at subjects matching cosmo.cron.<stream>.>. NATS 2.14 fires each template by publishing the body to Nats-Schedule-Target as a regular JetStream message that accumulates alongside pending jobs.
Defined Under Namespace
Classes: Entry
Class Method Summary collapse
Instance Method Summary collapse
-
#all ⇒ Array<Hash>
Every cron schedule currently deployed in NATS.
-
#delete!(subject) ⇒ Object
Purge the schedule message from NATS (stops future firings).
-
#run_now!(schedule_subject) ⇒ Object
Dispatch the job immediately to the target stream, bypassing the timer.
-
#upsert!(class_name: nil, stream: nil, schedule: nil, args: [], timezone: nil, name: nil) ⇒ Hash?
Publish (or replace) a schedule message in NATS.
Class Method Details
.instance ⇒ Object
18 19 20 |
# File 'lib/cosmo/api/cron.rb', line 18 def self.instance @instance ||= new end |
Instance Method Details
#all ⇒ Array<Hash>
Returns every cron schedule currently deployed in NATS.
23 24 25 26 27 |
# File 'lib/cosmo/api/cron.rb', line 23 def all Stream.jobs.flat_map { |s| schedules_from_stream(s.name) } rescue StandardError [] end |
#delete!(subject) ⇒ Object
Purge the schedule message from NATS (stops future firings).
45 46 47 48 49 50 |
# File 'lib/cosmo/api/cron.rb', line 45 def delete!(subject) stream_name = subject.to_s.split(".")[2] client.purge(stream_name, subject) rescue NATS::JetStream::Error::NotFound, NATS::IO::Timeout nil end |
#run_now!(schedule_subject) ⇒ Object
Dispatch the job immediately to the target stream, bypassing the timer.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/cosmo/api/cron.rb', line 54 def run_now!(schedule_subject) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity stream_name = schedule_subject.to_s.split(".")[2] msg = client.(stream_name, subject: schedule_subject) return unless msg headers = msg.headers || {} body = Utils::Json.parse(msg.data) || {} target = headers["Nats-Schedule-Target"] return unless target && body[:class] payload = Utils::Json.dump({ jid: SecureRandom.hex(12), class: body[:class], args: body[:args] || [], retry: body[:retry] || Job::Data::DEFAULTS[:retry], dead: body[:dead].nil? ? Job::Data::DEFAULTS[:dead] : body[:dead] }) client.publish(target, payload, stream: stream_name) rescue NATS::JetStream::Error::NotFound nil end |
#upsert!(class_name: nil, stream: nil, schedule: nil, args: [], timezone: nil, name: nil) ⇒ Hash?
Publish (or replace) a schedule message in NATS.
31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/cosmo/api/cron.rb', line 31 def upsert!(class_name: nil, stream: nil, schedule: nil, args: [], timezone: nil, name: nil) e = Entry.new(class_name: class_name, stream: stream, expression: schedule, args: args, timezone: timezone, name: name) headers = { "Nats-Schedule" => e.expression, "Nats-Schedule-Target" => e.target_subject } headers["Nats-Schedule-Time-Zone"] = e.timezone if e.timezone client.publish(e.schedule_subject, e.job_payload, stream: e.stream, header: headers) build_from_nats(e.stream, e.schedule_subject) end |