Class: Cosmo::API::Cron

Inherits:
Object
  • Object
show all
Defined in:
lib/cosmo/api/cron.rb,
lib/cosmo/api/cron/entry.rb

Overview

Web-facing API for cron schedules. Single interface for all cron NATS operations.

Derives the schedule list entirely from NATS. Whatever is deployed in NATS is exactly what appears in the UI.

Schedule templates live in the same job stream they target (e.g. default), stored at subjects matching cosmo.cron.<stream>.>. NATS 2.14 fires each template by publishing the body to Nats-Schedule-Target as a regular JetStream message that accumulates alongside pending jobs.

Defined Under Namespace

Classes: Entry

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.instanceObject



18
19
20
# File 'lib/cosmo/api/cron.rb', line 18

def self.instance
  @instance ||= new
end

Instance Method Details

#allArray<Hash>

Returns every cron schedule currently deployed in NATS.

Returns:

  • (Array<Hash>)

    every cron schedule currently deployed in NATS



23
24
25
26
27
# File 'lib/cosmo/api/cron.rb', line 23

def all
  Stream.jobs.flat_map { |s| schedules_from_stream(s.name) }
rescue StandardError
  []
end

#delete!(subject) ⇒ Object

Purge the schedule message from NATS (stops future firings).

Parameters:

  • subject (String)


45
46
47
48
49
50
# File 'lib/cosmo/api/cron.rb', line 45

def delete!(subject)
  stream_name = subject.to_s.split(".")[2]
  client.purge(stream_name, subject)
rescue NATS::JetStream::Error::NotFound, NATS::IO::Timeout
  nil
end

#run_now!(schedule_subject) ⇒ Object

Dispatch the job immediately to the target stream, bypassing the timer.

Parameters:

  • schedule_subject (String)

    e.g. “cosmo.cron.default.report_job.daily”



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/cosmo/api/cron.rb', line 54

def run_now!(schedule_subject) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
  stream_name = schedule_subject.to_s.split(".")[2]
  msg = client.get_message(stream_name, subject: schedule_subject)
  return unless msg

  headers = msg.headers || {}
  body = Utils::Json.parse(msg.data) || {}
  target = headers["Nats-Schedule-Target"]
  return unless target && body[:class]

  payload = Utils::Json.dump({
                               jid: SecureRandom.hex(12),
                               class: body[:class],
                               args: body[:args] || [],
                               retry: body[:retry] || Job::Data::DEFAULTS[:retry],
                               dead: body[:dead].nil? ? Job::Data::DEFAULTS[:dead] : body[:dead]
                             })
  client.publish(target, payload, stream: stream_name)
rescue NATS::JetStream::Error::NotFound
  nil
end

#upsert!(class_name: nil, stream: nil, schedule: nil, args: [], timezone: nil, name: nil) ⇒ Hash?

Publish (or replace) a schedule message in NATS.

Returns:

  • (Hash, nil)

    the persisted schedule as a hash, or nil on failure



31
32
33
34
35
36
37
38
39
40
41
# File 'lib/cosmo/api/cron.rb', line 31

def upsert!(class_name: nil, stream: nil, schedule: nil, args: [], timezone: nil, name: nil)
  e = Entry.new(class_name: class_name, stream: stream, expression: schedule,
                args: args, timezone: timezone, name: name)
  headers = {
    "Nats-Schedule" => e.expression,
    "Nats-Schedule-Target" => e.target_subject
  }
  headers["Nats-Schedule-Time-Zone"] = e.timezone if e.timezone
  client.publish(e.schedule_subject, e.job_payload, stream: e.stream, header: headers)
  build_from_nats(e.stream, e.schedule_subject)
end