Class: Gouda::Adapter

Inherits:
Object
  • Object
show all
Includes:
BulkAdapterExtension
Defined in:
lib/gouda/adapter.rb

Overview

Acts as an ActiveJob adapter

Direct Known Subclasses

ActiveJob::QueueAdapters::GoudaAdapter

Constant Summary collapse

ENQUEUE_ERROR_MESSAGE =
<<~ERR
  The job has been rejected due to a matching enqueue concurrency key
ERR

Instance Method Summary collapse

Instance Method Details

#enqueue(active_job) ⇒ String?

Enqueues the ActiveJob job to be performed. For use by Rails; you should generally not call this directly.

Parameters:

  • active_job (ActiveJob::Base)

    the job to be enqueued from #perform_later

Returns:

  • (String, nil)

    the ID of the inserted workload or nil if the insert did not go through (due to concurrency)



16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/gouda/adapter.rb', line 16

def enqueue(active_job)
  # This is the method that gets called by ActiveJob internally (from inside the ActiveJob::Base instance
  # method). This is also when ActiveJob runs the enqueue callbacks. After this method returns
  # ActiveJob will set @successfully_enqueued inside the job to `true` as long as no
  # EnqueueError has been raised. This is, of course, incompatible with bulk-enqueueing (which we want)
  # to use by default. What we can do is verify the value of the property set by our `enqueue_all` method,
  # and raise the exception based on that.
  enqueue_all([active_job])
  if active_job.enqueue_error
    Gouda.logger.warn { "Error #{active_job.enqueue_error.inspect} for Gouda workload (#{active_job.job_id})" }
    raise active_job.enqueue_error
  end
  active_job.provider_job_id
end

#enqueue_after_transaction_commit?Boolean

The whole point of Gouda is actually co-committing jobs with the business objects they use. The changes in Rails are directed towards shifting the job enqueues into an after_commit hook, so that the jobs - when they start executing - will always find the committed business-objects in the database. It is their attempt at ensuring read-after-write consistency in the face of two separate data stores. However, with a DB-based job queue which is using the same database as the rest of the application, we actually want the opposite - if a transaction commits, we want it to commit both the jobs to be done on the business objects and the business objects themselves. Folding the job enqueues into the same transaction can also be a great improvement to performance. Some of our jobs also imply that a job was generated as a result of a business model change. With after_commit, there is a subtle race condition where your application may crash between you doing the COMMIT on your transaction and the after_commit hooks executing. We want to avoid this in Gouda and always have a guarantee that if our main models committed, so did the jobs that use them. So: tell ActiveJob that we prefer the jobs to be co-committed.

See github.com/rails/rails/pull/51426

Returns:

  • (Boolean)


130
131
132
# File 'lib/gouda/adapter.rb', line 130

def enqueue_after_transaction_commit?
  false
end

#enqueue_all(active_jobs) ⇒ Integer

Enqueues multiple ActiveJobs. For use by Rails; you should generally not call this directly.

Parameters:

  • active_job (ActiveJob::Base)

    the job to be enqueued from #perform_later

  • timestamp (Integer, nil)

    the epoch time to perform the job

Returns:

  • (Integer)

    the number of jobs which were successfully sent to the queue



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/gouda/adapter.rb', line 51

def enqueue_all(active_jobs)
  t_now = Time.now.utc
  bulk_insert_attributes = active_jobs.map.with_index do |active_job, i|
    # We can't pregenerate an ID because we want to INSERT .. ON CONFLICT DO NOTHING
    # and we want Postgres to use _all_ unique indexes for it, which would include a conflict of IDs -
    # so some jobs could get silently rejected because of a duplicate ID. However unlikely this can better be prevented.
    # We can't tell Postgres to ignore conflicts on _both_ the scheduler key and the enqueue concurrency key but not on
    # the ID - it is either "all indexes" or "just one", but never "this index and that index". MERGE https://www.postgresql.org/docs/current/sql-merge.html
    # is in theory capable of solving this but let's not complicate things all to hastily, the hour is getting late
    {
      active_job_id: active_job.job_id, # Multiple jobs can have the same ID due to retries, job-iteration etc.
      scheduled_at: active_job.scheduled_at || t_now,
      scheduler_key: active_job.scheduler_key, # So that the scheduler_key gets retained between retries
      priority: active_job.priority,
      execution_concurrency_key: extract_execution_concurrency_key(active_job),
      enqueue_concurrency_key: extract_enqueue_concurrency_key(active_job),
      queue_name: active_job.queue_name || "default",
      active_job_class_name: active_job.class.to_s,
      serialized_params: active_job.serialize.except("provider_job_id"), # For when a job which gets retried
      interrupted_at: active_job.interrupted_at, # So that an exception can be raised when this job gets executed
      position_in_bulk: i,
      state: "enqueued"
    }
  end

  # Filter out all the jobs with the same (and present) concurrency key and scheduler key
  bulk_insert_attributes = filter_by_unique_not_nil_hash_key(bulk_insert_attributes, :enqueue_concurrency_key)
  bulk_insert_attributes = filter_by_unique_not_nil_hash_key(bulk_insert_attributes, :scheduler_key)

  # Do a bulk insert. For jobs with an enqueue concurrency key there will be no enqueue
  # as the default for insert_all is to DO NOTHING. An exception would be nice but we are after performance here.
  # Use batches of 500 so that we do not exceed the maximum statement size or do not create a transaction for the
  # insert which times out
  inserted_ids_and_positions = bulk_insert_attributes.each_slice(500).flat_map do |chunk|
    ActiveSupport::Notifications.instrument("insert_all.gouda", {n_rows: chunk.size}) do |payload|
      rows = Gouda::Workload.insert_all(chunk, returning: [:id, :position_in_bulk])
      payload[:inserted_jobs] = rows.length
      payload[:rejected_jobs] = chunk.size - rows.length
      rows
    end
  end

  # Mark all the jobs we ended up not enqueuing as such. If these jobs are getting enqueued "one by one"
  # then their callbacks have already run, and they are already set to `successfully_enqueued = true`. If
  # they are enqueued using `enqueue_all` directly there are no guarantees, as `enqueue_all` is a fairly new
  # Rails feature. Now is the moment we need to "fish out" our bulk enqueue position and use it to detect
  # which jobs did get enqueued and which didn't. Yes, this is a bit roundabout - but otherwise we could
  # have a unique index and DO NOTHING just on the enqueue concurrency key
  inserted_ids_and_positions.each do |row|
    i = row.fetch("position_in_bulk")
    active_jobs[i].provider_job_id = row.fetch("id")
    active_jobs[i].successfully_enqueued = true
  end
  _, failed_enqueue = active_jobs.partition(&:successfully_enqueued?)
  failed_enqueue.each do |active_job|
    active_job.successfully_enqueued = false
    active_job.enqueue_error = ActiveJob::EnqueueError.new(ENQUEUE_ERROR_MESSAGE)
  end

  # And return how many jobs we _did_ enqueue
  inserted_ids_and_positions.length
end

#enqueue_at(active_job, timestamp_int) ⇒ String?

Enqueues an ActiveJob job to be run at a specific time. For use by Rails; you should generally not call this directly.

Parameters:

  • active_job (ActiveJob::Base)

    the job to be enqueued from #perform_later

  • timestamp (Integer, nil)

    the epoch time to perform the job

Returns:

  • (String, nil)

    the ID of the inserted Gouda or nil if the insert did not go through (due to concurrency)



36
37
38
39
40
41
42
43
44
# File 'lib/gouda/adapter.rb', line 36

def enqueue_at(active_job, timestamp_int)
  active_job.scheduled_at = Time.at(timestamp_int).utc
  enqueue_all([active_job])
  if active_job.enqueue_error
    Gouda.logger.warn { "Error #{active_job.enqueue_error.inspect} for Gouda workload (#{active_job.job_id})" }
    raise active_job.enqueue_error
  end
  active_job.provider_job_id
end