Class: GoodJob::Bulk::Buffer

Inherits:
Object
  • Object
show all
Defined in:
lib/good_job/bulk.rb

Instance Method Summary collapse

Constructor Details

#initializeBuffer

Returns a new instance of Buffer.



63
64
65
# File 'lib/good_job/bulk.rb', line 63

def initialize
  @values = []
end

Instance Method Details

#active_jobsObject



120
121
122
# File 'lib/good_job/bulk.rb', line 120

def active_jobs
  @values.map(&:first)
end

#active_jobs_by_queue_adapterObject



113
114
115
116
117
118
# File 'lib/good_job/bulk.rb', line 113

def active_jobs_by_queue_adapter
  @values.each_with_object({}) do |(job, adapter), memo|
    memo[adapter] ||= []
    memo[adapter] << job
  end
end

#add(active_jobs, queue_adapter: nil) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/good_job/bulk.rb', line 75

def add(active_jobs, queue_adapter: nil)
  new_pairs = Array(active_jobs).map do |active_job|
    raise ArgumentError, "Expected an ActiveJob::Base instance, got #{active_job.class}" unless active_job.is_a?(ActiveJob::Base)

    adapter = queue_adapter || active_job.class.queue_adapter
    raise Error, "Jobs must have a Queue Adapter" unless adapter

    [active_job, adapter]
  end
  @values.append(*new_pairs)

  true
end

#captureObject



67
68
69
70
71
72
73
# File 'lib/good_job/bulk.rb', line 67

def capture
  original_buffer = Bulk.current_buffer
  Bulk.current_buffer = self
  yield
ensure
  Bulk.current_buffer = original_buffer
end

#enqueueObject



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/good_job/bulk.rb', line 89

def enqueue
  Bulk.unbuffer do
    active_jobs_by_queue_adapter.each do |adapter, jobs|
      jobs = jobs.reject(&:provider_job_id) # Do not re-enqueue already enqueued jobs

      if adapter.respond_to?(:enqueue_all)
        unbulkable_jobs, bulkable_jobs = jobs.partition do |job|
          job.respond_to?(:good_job_concurrency_key) && job.good_job_concurrency_key &&
            (job.class.good_job_concurrency_config[:enqueue_limit] || job.class.good_job_concurrency_config[:total_limit])
        end
        adapter.enqueue_all(bulkable_jobs) if bulkable_jobs.any?
      else
        unbulkable_jobs = jobs
      end

      unbulkable_jobs.each do |job|
        job.enqueue
      rescue GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError
        # ignore
      end
    end
  end
end