Class: Bulkrax::ImportWorkJob
- Inherits:
-
ApplicationJob
- Object
- ActiveJob::Base
- ApplicationJob
- Bulkrax::ImportWorkJob
- Defined in:
- app/jobs/bulkrax/import_work_job.rb
Instance Method Summary collapse
-
#perform(entry_id, run_id, time_to_live = 3) ⇒ Object
rubocop:disable Rails/SkipsModelValidations.
-
#reschedule(entry_id, run_id, time_to_live) ⇒ Object
rubocop:enable Rails/SkipsModelValidations.
Instance Method Details
#perform(entry_id, run_id, time_to_live = 3) ⇒ Object
Note:
Yes, we are calling ImporterRun.find each time. these were on purpose to prevent race conditions on the database update. If you do not re-find (or at least reload) the object on each increment, the count can get messed up. Let’s say there are two jobs A and B and a counter set to 2.
-
A grabs the importer_run (line 10)
-
B grabs the importer_run (line 10)
-
A Finishes the build, does the increment (now the counter is 3)
-
B Finishes the build, does the increment (now the counter is 3 again) and thus a count is lost.
rubocop:disable Rails/SkipsModelValidations
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'app/jobs/bulkrax/import_work_job.rb', line 22 def perform(entry_id, run_id, time_to_live = 3, *) entry = Entry.find(entry_id) entry.build if entry.status == "Complete" ImporterRun.increment_counter(:processed_records, run_id) ImporterRun.increment_counter(:processed_works, run_id) else # do not retry here because whatever parse error kept you from creating a work will likely # keep preventing you from doing so. ImporterRun.increment_counter(:failed_records, run_id) ImporterRun.increment_counter(:failed_works, run_id) end # Regardless of completion or not, we want to decrement the enqueued records. ImporterRun.decrement_counter(:enqueued_records, run_id) unless ImporterRun.find(run_id).enqueued_records <= 0 entry.save! entry.importer.current_run = ImporterRun.find(run_id) entry.importer.record_status rescue Bulkrax::CollectionsCreatedError => e Rails.logger.warn("#{self.class} entry_id: #{entry_id}, run_id: #{run_id} encountered #{e.class}: #{e.}") # You get 3 attempts at the above perform before we have the import exception cascade into # the Sidekiq retry ecosystem. # rubocop:disable Style/IfUnlessModifier if time_to_live <= 1 raise "Exhauted reschedule limit for #{self.class} entry_id: #{entry_id}, run_id: #{run_id}. Attemping retries" end # rubocop:enable Style/IfUnlessModifier reschedule(entry_id, run_id, time_to_live) end |
#reschedule(entry_id, run_id, time_to_live) ⇒ Object
rubocop:enable Rails/SkipsModelValidations
53 54 55 |
# File 'app/jobs/bulkrax/import_work_job.rb', line 53 def reschedule(entry_id, run_id, time_to_live) ImportWorkJob.set(wait: 1.minute).perform_later(entry_id, run_id, time_to_live - 1) end |