Class: Kaal::DelayedJob::DatabaseEngine
- Defined in:
- lib/kaal/delayed_job/database_engine.rb
Overview
Sequel-backed delayed-job store persisted in kaal_delayed_jobs.
Class Method Summary collapse
Instance Method Summary collapse
- #all_jobs ⇒ Object
- #claim_strategy ⇒ Object
- #enqueue(job_id:, run_at:, job_class:, args:, queue: nil, connection: nil) ⇒ Object
- #find_job(job_id, connection: @database.connection) ⇒ Object
-
#initialize(database:, use_skip_locked: false) ⇒ DatabaseEngine
constructor
A new instance of DatabaseEngine.
- #pop_due(now:, limit:) ⇒ Object
Methods inherited from Registry
Constructor Details
#initialize(database:, use_skip_locked: false) ⇒ DatabaseEngine
Returns a new instance of DatabaseEngine.
15 16 17 18 19 |
# File 'lib/kaal/delayed_job/database_engine.rb', line 15 def initialize(database:, use_skip_locked: false) super() @database = Kaal::Persistence::Database.new(database) @use_skip_locked = use_skip_locked end |
Class Method Details
.normalize_row(row) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/kaal/delayed_job/database_engine.rb', line 56 def self.normalize_row(row) return nil unless row { job_id: row[:job_id], run_at: row[:run_at], job_class: row[:job_class], args: parse_args(row[:args]), queue: row[:queue], created_at: row[:created_at] } rescue JSON::ParserError nil end |
Instance Method Details
#all_jobs ⇒ Object
48 49 50 |
# File 'lib/kaal/delayed_job/database_engine.rb', line 48 def all_jobs connection[:kaal_delayed_jobs].order(:run_at, :job_id).filter_map { |row| self.class.normalize_row(row) } end |
#claim_strategy ⇒ Object
52 53 54 |
# File 'lib/kaal/delayed_job/database_engine.rb', line 52 def claim_strategy @use_skip_locked ? :skip_locked : :delete_confirmation end |
#enqueue(job_id:, run_at:, job_class:, args:, queue: nil, connection: nil) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/kaal/delayed_job/database_engine.rb', line 21 def enqueue(job_id:, run_at:, job_class:, args:, queue: nil, connection: nil) now = Time.now.utc payload = { job_id: job_id, run_at: run_at, job_class: job_class, args: JSON.generate(args), queue: queue, created_at: now } dataset_for(connection).insert(payload) self.class.normalize_row(payload) rescue ::Sequel::UniqueConstraintViolation raise DuplicateJobError, "Delayed job #{job_id.inspect} already exists" end |
#find_job(job_id, connection: @database.connection) ⇒ Object
44 45 46 |
# File 'lib/kaal/delayed_job/database_engine.rb', line 44 def find_job(job_id, connection: @database.connection) self.class.normalize_row(connection[:kaal_delayed_jobs].where(job_id: job_id).first) end |
#pop_due(now:, limit:) ⇒ Object
38 39 40 41 42 |
# File 'lib/kaal/delayed_job/database_engine.rb', line 38 def pop_due(now:, limit:) return pop_due_with_skip_locked(now:, limit:) if @use_skip_locked pop_due_with_delete_confirmation(now:, limit:) end |