Class: Gouda::Adapter
- Inherits:
-
Object
- Object
- Gouda::Adapter
- Includes:
- BulkAdapterExtension
- Defined in:
- lib/gouda/adapter.rb
Overview
Acts as an ActiveJob adapter
Direct Known Subclasses
Constant Summary collapse
- ENQUEUE_ERROR_MESSAGE =
<<~ERR The job has been rejected due to a matching enqueue concurrency key ERR
Instance Method Summary collapse
-
#enqueue(active_job) ⇒ String?
Enqueues the ActiveJob job to be performed.
-
#enqueue_after_transaction_commit? ⇒ Boolean
The whole point of Gouda is actually co-committing jobs with the business objects they use.
-
#enqueue_all(active_jobs) ⇒ Integer
Enqueues multiple ActiveJobs.
-
#enqueue_at(active_job, timestamp_int) ⇒ String?
Enqueues an ActiveJob job to be run at a specific time.
Instance Method Details
#enqueue(active_job) ⇒ String?
Enqueues the ActiveJob job to be performed. For use by Rails; you should generally not call this directly.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/gouda/adapter.rb', line 16 def enqueue(active_job) # This is the method that gets called by ActiveJob internally (from inside the ActiveJob::Base instance # method). This is also when ActiveJob runs the enqueue callbacks. After this method returns # ActiveJob will set @successfully_enqueued inside the job to `true` as long as no # EnqueueError has been raised. This is, of course, incompatible with bulk-enqueueing (which we want) # to use by default. What we can do is verify the value of the property set by our `enqueue_all` method, # and raise the exception based on that. enqueue_all([active_job]) if active_job.enqueue_error Gouda.logger.warn { "Error #{active_job.enqueue_error.inspect} for Gouda workload (#{active_job.job_id})" } raise active_job.enqueue_error end active_job.provider_job_id end |
#enqueue_after_transaction_commit? ⇒ Boolean
The whole point of Gouda is actually co-committing jobs with the business objects they use. The changes in Rails are directed towards shifting the job enqueues into an after_commit hook, so that the jobs - when they start executing - will always find the committed business-objects in the database. It is their attempt at ensuring read-after-write consistency in the face of two separate data stores. However, with a DB-based job queue which is using the same database as the rest of the application, we actually want the opposite - if a transaction commits, we want it to commit both the jobs to be done on the business objects and the business objects themselves. Folding the job enqueues into the same transaction can also be a great improvement to performance. Some of our jobs also imply that a job was generated as a result of a business model change. With after_commit, there is a subtle race condition where your application may crash between you doing the COMMIT on your transaction and the after_commit hooks executing. We want to avoid this in Gouda and always have a guarantee that if our main models committed, so did the jobs that use them. So: tell ActiveJob that we prefer the jobs to be co-committed.
130 131 132 |
# File 'lib/gouda/adapter.rb', line 130 def enqueue_after_transaction_commit? false end |
#enqueue_all(active_jobs) ⇒ Integer
Enqueues multiple ActiveJobs. For use by Rails; you should generally not call this directly.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/gouda/adapter.rb', line 51 def enqueue_all(active_jobs) t_now = Time.now.utc bulk_insert_attributes = active_jobs.map.with_index do |active_job, i| # We can't pregenerate an ID because we want to INSERT .. ON CONFLICT DO NOTHING # and we want Postgres to use _all_ unique indexes for it, which would include a conflict of IDs - # so some jobs could get silently rejected because of a duplicate ID. However unlikely this can better be prevented. # We can't tell Postgres to ignore conflicts on _both_ the scheduler key and the enqueue concurrency key but not on # the ID - it is either "all indexes" or "just one", but never "this index and that index". MERGE https://www.postgresql.org/docs/current/sql-merge.html # is in theory capable of solving this but let's not complicate things all to hastily, the hour is getting late { active_job_id: active_job.job_id, # Multiple jobs can have the same ID due to retries, job-iteration etc. scheduled_at: active_job.scheduled_at || t_now, scheduler_key: active_job.scheduler_key, # So that the scheduler_key gets retained between retries priority: active_job.priority, execution_concurrency_key: extract_execution_concurrency_key(active_job), enqueue_concurrency_key: extract_enqueue_concurrency_key(active_job), queue_name: active_job.queue_name || "default", active_job_class_name: active_job.class.to_s, serialized_params: active_job.serialize.except("provider_job_id"), # For when a job which gets retried interrupted_at: active_job.interrupted_at, # So that an exception can be raised when this job gets executed position_in_bulk: i, state: "enqueued" } end # Filter out all the jobs with the same (and present) concurrency key and scheduler key bulk_insert_attributes = filter_by_unique_not_nil_hash_key(bulk_insert_attributes, :enqueue_concurrency_key) bulk_insert_attributes = filter_by_unique_not_nil_hash_key(bulk_insert_attributes, :scheduler_key) # Do a bulk insert. For jobs with an enqueue concurrency key there will be no enqueue # as the default for insert_all is to DO NOTHING. An exception would be nice but we are after performance here. # Use batches of 500 so that we do not exceed the maximum statement size or do not create a transaction for the # insert which times out inserted_ids_and_positions = bulk_insert_attributes.each_slice(500).flat_map do |chunk| ActiveSupport::Notifications.instrument("insert_all.gouda", {n_rows: chunk.size}) do |payload| rows = Gouda::Workload.insert_all(chunk, returning: [:id, :position_in_bulk]) payload[:inserted_jobs] = rows.length payload[:rejected_jobs] = chunk.size - rows.length rows end end # Mark all the jobs we ended up not enqueuing as such. If these jobs are getting enqueued "one by one" # then their callbacks have already run, and they are already set to `successfully_enqueued = true`. If # they are enqueued using `enqueue_all` directly there are no guarantees, as `enqueue_all` is a fairly new # Rails feature. Now is the moment we need to "fish out" our bulk enqueue position and use it to detect # which jobs did get enqueued and which didn't. Yes, this is a bit roundabout - but otherwise we could # have a unique index and DO NOTHING just on the enqueue concurrency key inserted_ids_and_positions.each do |row| i = row.fetch("position_in_bulk") active_jobs[i].provider_job_id = row.fetch("id") active_jobs[i].successfully_enqueued = true end _, failed_enqueue = active_jobs.partition(&:successfully_enqueued?) failed_enqueue.each do |active_job| active_job.successfully_enqueued = false active_job.enqueue_error = ActiveJob::EnqueueError.new(ENQUEUE_ERROR_MESSAGE) end # And return how many jobs we _did_ enqueue inserted_ids_and_positions.length end |
#enqueue_at(active_job, timestamp_int) ⇒ String?
Enqueues an ActiveJob job to be run at a specific time. For use by Rails; you should generally not call this directly.
36 37 38 39 40 41 42 43 44 |
# File 'lib/gouda/adapter.rb', line 36 def enqueue_at(active_job, ) active_job.scheduled_at = Time.at().utc enqueue_all([active_job]) if active_job.enqueue_error Gouda.logger.warn { "Error #{active_job.enqueue_error.inspect} for Gouda workload (#{active_job.job_id})" } raise active_job.enqueue_error end active_job.provider_job_id end |