Class: MarjAdapter
- Inherits:
-
Object
- Object
- MarjAdapter
- Defined in:
- lib/marj_adapter.rb
Overview
ActiveJob queue adapter for Marj.
In addition to the standard ActiveJob
queue adapter API, this adapter provides a query
method which can be used to query enqueued jobs and a discard
method which can be used to discard enqueued jobs.
Although it is possible to access the adapter directly in order to query or discard, it is recommended to use the Marj module.
Instance Method Summary collapse
-
#discard(job) ⇒ ActiveJob::Base
Discards the specified job.
-
#enqueue(job) ⇒ ActiveJob::Base
Enqueue a job for immediate execution.
-
#enqueue_at(job, timestamp = nil) ⇒ ActiveJob::Base
Enqueue a job for execution at the specified time.
-
#initialize(record_class = 'Marj::Record') ⇒ MarjAdapter
constructor
Creates a new adapter which will enqueue jobs using the given
ActiveRecord
class. -
#query(*args, **kwargs) ⇒ Object
Queries enqueued jobs.
Constructor Details
#initialize(record_class = 'Marj::Record') ⇒ MarjAdapter
Creates a new adapter which will enqueue jobs using the given ActiveRecord
class.
19 20 21 |
# File 'lib/marj_adapter.rb', line 19 def initialize(record_class = 'Marj::Record') @record_class = record_class end |
Instance Method Details
#discard(job) ⇒ ActiveJob::Base
Discards the specified job.
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/marj_adapter.rb', line 130 def discard(job) record = job.singleton_class.instance_variable_get(:@record) record ||= Marj::Record.find_by(job_id: job.job_id)&.tap { _1.send(:register_callbacks, job) } record&.destroy # Copied from ActiveJob::Exceptions#run_after_discard_procs exceptions = [] job.after_discard_procs.each do |blk| instance_exec(job, nil, &blk) rescue StandardError => e exceptions << e end raise exceptions.last if exceptions.any? job end |
#enqueue(job) ⇒ ActiveJob::Base
Enqueue a job for immediate execution.
27 28 29 |
# File 'lib/marj_adapter.rb', line 27 def enqueue(job) enqueue_at(job) end |
#enqueue_at(job, timestamp = nil) ⇒ ActiveJob::Base
Enqueue a job for execution at the specified time.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/marj_adapter.rb', line 36 def enqueue_at(job, = nil) job.scheduled_at = ? Time.at().utc : nil # Argument serialization is done by ActiveJob. ActiveRecord expects deserialized arguments. serialized = job.serialize.symbolize_keys!.without(:provider_job_id).merge(arguments: job.arguments) # Serialize sets locale to I18n.locale.to_s and enqueued_at to Time.now.utc.iso8601(9). # Update the job to reflect what is being enqueued. job.locale = serialized[:locale] job.enqueued_at = Time.iso8601(serialized[:enqueued_at]).utc # When a job is enqueued, we must create/update the corresponding database record. We also must ensure callbacks # are registered on the job instance so that when the job is executed, the database record is deleted or updated # (depending on the result). # # We keep track of whether callbacks have been registered by setting the @record instance variable on the job's # singleton class. This holds a reference to the record. This ensures that if execute is called on a record # instance, any updates to the database are reflected on that record instance. if (existing_record = job.singleton_class.instance_variable_get(:@record)) # This job instance has already been associated with a database row. if record_class.exists?(job_id: job.job_id) # The database row still exists, we simply need to update it. existing_record.update!(serialized) else # Someone else deleted the database row, we need to recreate and reload the existing record instance. We don't # want to register the new instance because someone might still have a reference to the existing one. record_class.create!(serialized) existing_record.reload end else # This job instance has not been associated with a database row. if (new_record = record_class.find_by(job_id: job.job_id)) # The database row already exists. Update it. new_record.update!(serialized) else # The database row does not exist. Create it. new_record = record_class.create!(serialized) end new_record.send(:register_callbacks, job) end job end |
#query(*args, **kwargs) ⇒ Object
Queries enqueued jobs. Similar to ActiveRecord.where
with a few additional features:
-
Leading symbol arguments are treated as
ActiveRecord
scopes. -
If only a job ID is specified, the corresponding job is returned.
-
If
:limit
is specified, the maximum number of jobs is limited.
Example usage:
query(:all) # Delegates to Marj::Record.all
query(:due) # Delegates to Marj::Record.due
query(:all, limit: 10) # Returns a maximum of 10 jobs
query(job_class: Foo) # Returns all jobs with job_class Foo
query('123') # Returns the job with id '123' or nil if no such job exists
query(id: '123') # Same as above
query(job_id: '123') # Same as above
query(queue: 'foo') # Returns all jobs in the 'foo' queue
query(job_queue: 'foo') # Same as above
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/marj_adapter.rb', line 96 def query(*args, **kwargs) args, kwargs = args.dup, kwargs.dup.symbolize_keys kwargs = kwargs.merge(queue_name: kwargs.delete(:queue)) if kwargs.key?(:queue) kwargs = kwargs.merge(job_id: kwargs.delete(:id)) if kwargs.key?(:id) kwargs[:job_id] = args.shift if args.size == 1 && args.first.is_a?(String) && args.first.match(JOB_ID_REGEX) if args.empty? && kwargs.size == 1 && kwargs.key?(:job_id) return record_class.find_by(job_id: kwargs[:job_id])&.to_job end symbol_args = [] symbol_args << args.shift while args.first.is_a?(Symbol) order_by = kwargs.delete(:order) order_by = :queue_name if [:queue, 'queue'].include?(order_by) limit = kwargs.delete(:limit) symbol_args.shift if symbol_args.first == :all relation = record_class.all relation = relation.order(order_by) if order_by relation = relation.by_due_date unless relation.order_values.any? relation = relation.where(*args, **kwargs) if args.any? || kwargs.any? relation = relation.limit(limit) if limit relation = relation.send(symbol_args.shift) while symbol_args.any? if relation.is_a?(Enumerable) relation.map(&:to_job) elsif relation.is_a?(record_class) relation.to_job else relation end end |