Class: SidekiqBatch

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
app/models/sidekiq_batch.rb,
app/models/sidekiq_batch/middleware.rb,
app/models/sidekiq_batch/client_middleware.rb,
app/models/sidekiq_batch/batch_enrollment_context.rb

Overview

Schema Information

Table name: sidekiq_batches

id                :bigint           not null, primary key
callback_fired_at :datetime
callbacks         :jsonb            not null
completed_at      :datetime
context           :jsonb            not null
description       :string
status            :integer          default("pending"), not null
total_jobs        :integer          default(0), not null
created_at        :datetime         not null
updated_at        :datetime         not null

Defined Under Namespace

Classes: BatchEnrollmentContext, ClientMiddleware, Middleware

Constant Summary collapse

EVENTS =
%w[complete failure].freeze

Instance Method Summary collapse

Instance Method Details

#attempt_completion!Object

Atomic completion check. If all enrolled jobs are terminal, transition the batch to ‘complete` or `failed`, fire the registered callback, and stamp `callback_fired_at`. Returns the resulting status (String) or nil if the batch is not yet ready to transition.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'app/models/sidekiq_batch.rb', line 73

def attempt_completion!
  sql = self.class.sanitize_sql_array([completion_sql, id])

  result = self.class.connection.exec_query(sql)

  return nil if result.rows.empty?

  reload

  event = failed_status? ? "failure" : "complete"

  fire_callback(event)

  status
end

#completed_jobsObject



65
66
67
# File 'app/models/sidekiq_batch.rb', line 65

def completed_jobs
  sidekiq_batch_jobs.where(status: "complete")
end

#failed_jobsObject



61
62
63
# File 'app/models/sidekiq_batch.rb', line 61

def failed_jobs
  sidekiq_batch_jobs.where(status: "failed")
end

#fire_callback(event) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'app/models/sidekiq_batch.rb', line 89

def fire_callback(event)
  return nil if callback_fired_at.present?

  job_class_name = callbacks[event.to_s]

  return nil if job_class_name.blank?

  self.class.transaction do
    update!(callback_fired_at: Time.current)
    job_class_name.constantize.perform_async(id)
  end

  job_class_name
end

#jobs(&block) ⇒ Object

Raises:

  • (ArgumentError)


38
39
40
41
42
43
44
# File 'app/models/sidekiq_batch.rb', line 38

def jobs(&block)
  raise ArgumentError, "block required" unless block_given?

  BatchEnrollmentContext.new(self).run(&block)

  self
end

#on(event, job_class) ⇒ Object

Raises:

  • (ArgumentError)


27
28
29
30
31
32
33
34
35
36
# File 'app/models/sidekiq_batch.rb', line 27

def on(event, job_class)
  event_str = event.to_s
  raise ArgumentError, "unknown event #{event.inspect}" unless EVENTS.include?(event_str)

  self.callbacks = callbacks.merge(event_str => job_class.to_s)

  save!

  self
end

#pending_jobsObject



57
58
59
# File 'app/models/sidekiq_batch.rb', line 57

def pending_jobs
  sidekiq_batch_jobs.where(status: "pending")
end

#progressObject



46
47
48
49
50
51
52
53
54
55
# File 'app/models/sidekiq_batch.rb', line 46

def progress
  counts = sidekiq_batch_jobs.group(:status).count

  {
    total:    total_jobs,
    complete: counts.fetch("complete", 0),
    failed:   counts.fetch("failed", 0),
    pending:  counts.fetch("pending", 0)
  }
end