Class: MarjAdapter

Inherits:
Object
  • Object
show all
Defined in:
lib/marj_adapter.rb

Overview

ActiveJob queue adapter for Marj.

In addition to the standard ActiveJob queue adapter API, this adapter provides a query method which can be used to query enqueued jobs and a discard method which can be used to discard enqueued jobs.

Although it is possible to access the adapter directly in order to query or discard, it is recommended to use the Marj module.

See github.com/nicholasdower/marj

Instance Method Summary collapse

Constructor Details

#initialize(record_class = 'Marj::Record') ⇒ MarjAdapter

Creates a new adapter which will enqueue jobs using the given ActiveRecord class.

Parameters:

  • record_class (Class, String) (defaults to: 'Marj::Record')

    the ActiveRecord class (or its name) to use to store jobs



19
20
21
# File 'lib/marj_adapter.rb', line 19

def initialize(record_class = 'Marj::Record')
  @record_class = record_class
end

Instance Method Details

#discard(job) ⇒ ActiveJob::Base

Discards the specified job.

Returns:

  • (ActiveJob::Base)

    the discarded job



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/marj_adapter.rb', line 130

def discard(job)
  record = job.singleton_class.instance_variable_get(:@record)
  record ||= Marj::Record.find_by(job_id: job.job_id)&.tap { _1.send(:register_callbacks, job) }
  record&.destroy

  # Copied from ActiveJob::Exceptions#run_after_discard_procs
  exceptions = []
  job.after_discard_procs.each do |blk|
    instance_exec(job, nil, &blk)
  rescue StandardError => e
    exceptions << e
  end
  raise exceptions.last if exceptions.any?

  job
end

#enqueue(job) ⇒ ActiveJob::Base

Enqueue a job for immediate execution.

Parameters:

  • job (ActiveJob::Base)

    the job to enqueue

Returns:

  • (ActiveJob::Base)

    the enqueued job



27
28
29
# File 'lib/marj_adapter.rb', line 27

def enqueue(job)
  enqueue_at(job)
end

#enqueue_at(job, timestamp = nil) ⇒ ActiveJob::Base

Enqueue a job for execution at the specified time.

Parameters:

  • job (ActiveJob::Base)

    the job to enqueue

  • timestamp (Numeric, NilClass) (defaults to: nil)

    optional number of seconds since Unix epoch at which to execute the job

Returns:

  • (ActiveJob::Base)

    the enqueued job



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/marj_adapter.rb', line 36

def enqueue_at(job, timestamp = nil)
  job.scheduled_at = timestamp ? Time.at(timestamp).utc : nil

  # Argument serialization is done by ActiveJob. ActiveRecord expects deserialized arguments.
  serialized = job.serialize.symbolize_keys!.without(:provider_job_id).merge(arguments: job.arguments)

  # Serialize sets locale to I18n.locale.to_s and enqueued_at to Time.now.utc.iso8601(9).
  # Update the job to reflect what is being enqueued.
  job.locale = serialized[:locale]
  job.enqueued_at = Time.iso8601(serialized[:enqueued_at]).utc

  # When a job is enqueued, we must create/update the corresponding database record. We also must ensure callbacks
  # are registered on the job instance so that when the job is executed, the database record is deleted or updated
  # (depending on the result).
  #
  # We keep track of whether callbacks have been registered by setting the @record instance variable on the job's
  # singleton class. This holds a reference to the record. This ensures that if execute is called on a record
  # instance, any updates to the database are reflected on that record instance.
  if (existing_record = job.singleton_class.instance_variable_get(:@record))
    # This job instance has already been associated with a database row.
    if record_class.exists?(job_id: job.job_id)
      # The database row still exists, we simply need to update it.
      existing_record.update!(serialized)
    else
      # Someone else deleted the database row, we need to recreate and reload the existing record instance. We don't
      # want to register the new instance because someone might still have a reference to the existing one.
      record_class.create!(serialized)
      existing_record.reload
    end
  else
    # This job instance has not been associated with a database row.
    if (new_record = record_class.find_by(job_id: job.job_id))
      # The database row already exists. Update it.
      new_record.update!(serialized)
    else
      # The database row does not exist. Create it.
      new_record = record_class.create!(serialized)
    end
    new_record.send(:register_callbacks, job)
  end
  job
end

#query(*args, **kwargs) ⇒ Object

Queries enqueued jobs. Similar to ActiveRecord.where with a few additional features:

  • Leading symbol arguments are treated as ActiveRecord scopes.

  • If only a job ID is specified, the corresponding job is returned.

  • If :limit is specified, the maximum number of jobs is limited.

Example usage:

query(:all)             # Delegates to Marj::Record.all
query(:due)             # Delegates to Marj::Record.due
query(:all, limit: 10)  # Returns a maximum of 10 jobs
query(job_class: Foo)   # Returns all jobs with job_class Foo

query('123')            # Returns the job with id '123' or nil if no such job exists
query(id: '123')        # Same as above
query(job_id: '123')    # Same as above

query(queue: 'foo')     # Returns all jobs in the 'foo' queue
query(job_queue: 'foo') # Same as above


96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/marj_adapter.rb', line 96

def query(*args, **kwargs)
  args, kwargs = args.dup, kwargs.dup.symbolize_keys
  kwargs = kwargs.merge(queue_name: kwargs.delete(:queue)) if kwargs.key?(:queue)
  kwargs = kwargs.merge(job_id: kwargs.delete(:id)) if kwargs.key?(:id)
  kwargs[:job_id] = args.shift if args.size == 1 && args.first.is_a?(String) && args.first.match(JOB_ID_REGEX)

  if args.empty? && kwargs.size == 1 && kwargs.key?(:job_id)
    return record_class.find_by(job_id: kwargs[:job_id])&.to_job
  end

  symbol_args = []
  symbol_args << args.shift while args.first.is_a?(Symbol)
  order_by = kwargs.delete(:order)
  order_by = :queue_name if [:queue, 'queue'].include?(order_by)
  limit = kwargs.delete(:limit)
  symbol_args.shift if symbol_args.first == :all
  relation = record_class.all
  relation = relation.order(order_by) if order_by
  relation = relation.by_due_date unless relation.order_values.any?
  relation = relation.where(*args, **kwargs) if args.any? || kwargs.any?
  relation = relation.limit(limit) if limit
  relation = relation.send(symbol_args.shift) while symbol_args.any?
  if relation.is_a?(Enumerable)
    relation.map(&:to_job)
  elsif relation.is_a?(record_class)
    relation.to_job
  else
    relation
  end
end