Class: Kaal::Internal::ActiveRecord::DelayedJobRegistry
- Inherits:
-
DelayedJob::Registry
- Object
- DelayedJob::Registry
- Kaal::Internal::ActiveRecord::DelayedJobRegistry
- Defined in:
- lib/kaal/internal/active_record/delayed_job_registry.rb
Overview
Active Record-backed store for delayed jobs.
Class Method Summary collapse
Instance Method Summary collapse
- #all_jobs ⇒ Object
- #claim_strategy ⇒ Object
- #enqueue(job_id:, run_at:, job_class:, args:, queue: nil, connection: nil) ⇒ Object
- #find_job(job_id) ⇒ Object
-
#initialize(connection: nil, model: DelayedJobRecord, use_skip_locked: false) ⇒ DelayedJobRegistry
constructor
A new instance of DelayedJobRegistry.
- #pop_due(now:, limit:) ⇒ Object
Methods inherited from DelayedJob::Registry
Constructor Details
#initialize(connection: nil, model: DelayedJobRecord, use_skip_locked: false) ⇒ DelayedJobRegistry
Returns a new instance of DelayedJobRegistry.
15 16 17 18 19 20 |
# File 'lib/kaal/internal/active_record/delayed_job_registry.rb', line 15 def initialize(connection: nil, model: DelayedJobRecord, use_skip_locked: false) super() ConnectionSupport.configure!(connection) @model = model @use_skip_locked = use_skip_locked end |
Class Method Details
.normalize(record) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/kaal/internal/active_record/delayed_job_registry.rb', line 84 def self.normalize(record) return nil unless record { job_id: record.job_id, run_at: record.run_at, job_class: record.job_class, args: parse_args(record.args), queue: record.queue, created_at: record.created_at } rescue JSON::ParserError nil end |
Instance Method Details
#all_jobs ⇒ Object
76 77 78 |
# File 'lib/kaal/internal/active_record/delayed_job_registry.rb', line 76 def all_jobs @model.order(:run_at, :job_id).filter_map { |record| self.class.normalize(record) } end |
#claim_strategy ⇒ Object
80 81 82 |
# File 'lib/kaal/internal/active_record/delayed_job_registry.rb', line 80 def claim_strategy @use_skip_locked ? :skip_locked : :delete_confirmation end |
#enqueue(job_id:, run_at:, job_class:, args:, queue: nil, connection: nil) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/kaal/internal/active_record/delayed_job_registry.rb', line 22 def enqueue(job_id:, run_at:, job_class:, args:, queue: nil, connection: nil) now = Time.now.utc attributes = { job_id: job_id, run_at: run_at, job_class: job_class, args: JSON.generate(args), queue: queue, created_at: now } if connection insert_with_connection(connection, attributes) else @model.create!(attributes) end self.class.normalize(@model.new(attributes)) rescue ::ActiveRecord::RecordNotUnique raise Kaal::DelayedJob::DuplicateJobError, "Delayed job #{job_id.inspect} already exists" end |
#find_job(job_id) ⇒ Object
72 73 74 |
# File 'lib/kaal/internal/active_record/delayed_job_registry.rb', line 72 def find_job(job_id) self.class.normalize(@model.find_by(job_id: job_id)) end |
#pop_due(now:, limit:) ⇒ Object
44 45 46 47 48 |
# File 'lib/kaal/internal/active_record/delayed_job_registry.rb', line 44 def pop_due(now:, limit:) return pop_due_with_skip_locked(now:, limit:) if @use_skip_locked pop_due_with_delete_confirmation(now:, limit:) end |