Class: Pgbus::Recurring::Schedule
- Inherits:
-
Object
- Object
- Pgbus::Recurring::Schedule
- Defined in:
- lib/pgbus/recurring/schedule.rb
Instance Attribute Summary collapse
-
#tasks ⇒ Object
readonly
Returns the value of attribute tasks.
Instance Method Summary collapse
- #build_payload(task) ⇒ Object
- #due_tasks(time = Time.current) ⇒ Object
- #enqueue_task(task, run_at:) ⇒ Object
-
#initialize(config: Pgbus.configuration) ⇒ Schedule
constructor
A new instance of Schedule.
Constructor Details
#initialize(config: Pgbus.configuration) ⇒ Schedule
Returns a new instance of Schedule.
8 9 10 11 |
# File 'lib/pgbus/recurring/schedule.rb', line 8 def initialize(config: Pgbus.configuration) @config = config @tasks = load_tasks end |
Instance Attribute Details
#tasks ⇒ Object (readonly)
Returns the value of attribute tasks.
6 7 8 |
# File 'lib/pgbus/recurring/schedule.rb', line 6 def tasks @tasks end |
Instance Method Details
#build_payload(task) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/pgbus/recurring/schedule.rb', line 53 def build_payload(task) if task.command { "job_class" => "Pgbus::Recurring::CommandJob", "arguments" => [task.command], "queue_name" => task.queue_name || @config.default_queue, "priority" => nil } else { "job_class" => task.class_name, "arguments" => task.arguments, "queue_name" => task.queue_name || @config.default_queue, "priority" => task.priority.zero? ? nil : task.priority } end end |
#due_tasks(time = Time.current) ⇒ Object
13 14 15 16 17 18 |
# File 'lib/pgbus/recurring/schedule.rb', line 13 def due_tasks(time = Time.current) tasks.filter_map do |task| run_at = canonical_run_at(task, time) [task, run_at] if run_at end end |
#enqueue_task(task, run_at:) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/pgbus/recurring/schedule.rb', line 20 def enqueue_task(task, run_at:) queue = resolve_queue(task) acquired_key = acquire_uniqueness_lock(task) return if acquired_key == :already_locked RecurringExecution.record(task.key, run_at) do payload = build_payload(task) headers = build_headers(task, run_at) payload = (task, payload) Pgbus.client.ensure_queue(queue) Pgbus.client.(queue, payload, headers: headers) Pgbus.logger.info do "[Pgbus] Enqueued recurring task #{task.key} (#{task.class_name || task.command}) " \ "for run_at=#{run_at.iso8601}" end end rescue AlreadyRecorded # AlreadyRecorded means this (task_key, run_at) was already enqueued. # If we acquired a NEW lock (prior lock was already released because the # job completed), release it — no message will use it. If we didn't # acquire a lock (nil or :already_locked), there's nothing to release. # In either case, we are NOT opening a race window because the job for # this run_at already ran or is running. release_uniqueness_lock(acquired_key) Pgbus.logger.debug { "[Pgbus] Recurring task #{task.key} already enqueued for #{run_at.iso8601}" } rescue StandardError release_uniqueness_lock(acquired_key) raise end |