Class: Pgbus::ActiveJob::Adapter

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/active_job/adapter.rb

Instance Method Summary collapse

Instance Method Details

#enqueue(active_job) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
# File 'lib/pgbus/active_job/adapter.rb', line 8

def enqueue(active_job)
  queue = active_job.queue_name || Pgbus.configuration.default_queue
  payload_hash = Serializer.serialize_job_hash(active_job)
  payload_hash = Concurrency.(active_job, payload_hash)
  payload_hash = Uniqueness.(active_job, payload_hash)
  payload_hash = (payload_hash)

  return active_job if uniqueness_rejected?(active_job, payload_hash)

  enqueue_with_concurrency(active_job, queue, payload_hash)
end

#enqueue_all(active_jobs) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/pgbus/active_job/adapter.rb', line 33

def enqueue_all(active_jobs)
  # Jobs with uniqueness must go through individual enqueue to acquire locks
  unique, bulk = active_jobs.partition { |j| Uniqueness.uniqueness_config(j) }
  unique.each do |j|
    if scheduled_in_future?(j)
      enqueue_at(j, j.scheduled_at.to_f)
    else
      enqueue(j)
    end
  end

  bulk.group_by { |j| j.queue_name || Pgbus.configuration.default_queue }.each do |queue, jobs|
    immediate, scheduled = jobs.partition { |j| !scheduled_in_future?(j) }
    enqueue_immediate(queue, immediate)
    scheduled.each { |j| enqueue_at(j, j.scheduled_at.to_f) }
  end

  active_jobs.count
end

#enqueue_at(active_job, timestamp) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/pgbus/active_job/adapter.rb', line 20

def enqueue_at(active_job, timestamp)
  queue = active_job.queue_name || Pgbus.configuration.default_queue
  payload_hash = Serializer.serialize_job_hash(active_job)
  payload_hash = Concurrency.(active_job, payload_hash)
  payload_hash = Uniqueness.(active_job, payload_hash)
  payload_hash = (payload_hash)
  delay = [(timestamp - Time.current.to_f).ceil, 0].max

  return active_job if uniqueness_rejected?(active_job, payload_hash)

  enqueue_with_concurrency(active_job, queue, payload_hash, delay: delay)
end