Class: Pgbus::ActiveJob::Adapter
- Inherits:
-
Object
- Object
- Pgbus::ActiveJob::Adapter
- Defined in:
- lib/pgbus/active_job/adapter.rb
Instance Method Summary collapse
- #enqueue(active_job) ⇒ Object
- #enqueue_all(active_jobs) ⇒ Object
- #enqueue_at(active_job, timestamp) ⇒ Object
Instance Method Details
#enqueue(active_job) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/pgbus/active_job/adapter.rb', line 8 def enqueue(active_job) queue = active_job.queue_name || Pgbus.configuration.default_queue payload_hash = Serializer.serialize_job_hash(active_job) payload_hash = Concurrency.(active_job, payload_hash) payload_hash = Uniqueness.(active_job, payload_hash) payload_hash = (payload_hash) return active_job if uniqueness_rejected?(active_job, payload_hash) enqueue_with_concurrency(active_job, queue, payload_hash) end |
#enqueue_all(active_jobs) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/pgbus/active_job/adapter.rb', line 33 def enqueue_all(active_jobs) # Jobs with uniqueness must go through individual enqueue to acquire locks unique, bulk = active_jobs.partition { |j| Uniqueness.uniqueness_config(j) } unique.each do |j| if scheduled_in_future?(j) enqueue_at(j, j.scheduled_at.to_f) else enqueue(j) end end bulk.group_by { |j| j.queue_name || Pgbus.configuration.default_queue }.each do |queue, jobs| immediate, scheduled = jobs.partition { |j| !scheduled_in_future?(j) } enqueue_immediate(queue, immediate) scheduled.each { |j| enqueue_at(j, j.scheduled_at.to_f) } end active_jobs.count end |
#enqueue_at(active_job, timestamp) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/pgbus/active_job/adapter.rb', line 20 def enqueue_at(active_job, ) queue = active_job.queue_name || Pgbus.configuration.default_queue payload_hash = Serializer.serialize_job_hash(active_job) payload_hash = Concurrency.(active_job, payload_hash) payload_hash = Uniqueness.(active_job, payload_hash) payload_hash = (payload_hash) delay = [( - Time.current.to_f).ceil, 0].max return active_job if uniqueness_rejected?(active_job, payload_hash) enqueue_with_concurrency(active_job, queue, payload_hash, delay: delay) end |