Class: SidekiqBatch
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- SidekiqBatch
show all
- Defined in:
- app/models/sidekiq_batch.rb,
app/models/sidekiq_batch/middleware.rb,
app/models/sidekiq_batch/client_middleware.rb,
app/models/sidekiq_batch/batch_enrollment_context.rb
Overview
Table name: sidekiq_batches
id :bigint not null, primary key
callback_fired_at :datetime
callbacks :jsonb not null
completed_at :datetime
context :jsonb not null
description :string
status :integer default("pending"), not null
total_jobs :integer default(0), not null
created_at :datetime not null
updated_at :datetime not null
Defined Under Namespace
Classes: BatchEnrollmentContext, ClientMiddleware, Middleware
Constant Summary
collapse
- EVENTS =
%w[complete failure].freeze
Instance Method Summary
collapse
Instance Method Details
#attempt_completion! ⇒ Object
Atomic completion check. If all enrolled jobs are terminal, transition the batch to ‘complete` or `failed`, fire the registered callback, and stamp `callback_fired_at`. Returns the resulting status (String) or nil if the batch is not yet ready to transition.
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
# File 'app/models/sidekiq_batch.rb', line 73
def attempt_completion!
sql = self.class.sanitize_sql_array([completion_sql, id])
result = self.class.connection.exec_query(sql)
return nil if result.rows.empty?
reload
event = failed_status? ? "failure" : "complete"
fire_callback(event)
status
end
|
#completed_jobs ⇒ Object
65
66
67
|
# File 'app/models/sidekiq_batch.rb', line 65
def completed_jobs
sidekiq_batch_jobs.where(status: "complete")
end
|
#failed_jobs ⇒ Object
61
62
63
|
# File 'app/models/sidekiq_batch.rb', line 61
def failed_jobs
sidekiq_batch_jobs.where(status: "failed")
end
|
#fire_callback(event) ⇒ Object
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
# File 'app/models/sidekiq_batch.rb', line 89
def fire_callback(event)
return nil if callback_fired_at.present?
job_class_name = callbacks[event.to_s]
return nil if job_class_name.blank?
self.class.transaction do
update!(callback_fired_at: Time.current)
job_class_name.constantize.perform_async(id)
end
job_class_name
end
|
#jobs(&block) ⇒ Object
38
39
40
41
42
43
44
|
# File 'app/models/sidekiq_batch.rb', line 38
def jobs(&block)
raise ArgumentError, "block required" unless block_given?
BatchEnrollmentContext.new(self).run(&block)
self
end
|
#on(event, job_class) ⇒ Object
27
28
29
30
31
32
33
34
35
36
|
# File 'app/models/sidekiq_batch.rb', line 27
def on(event, job_class)
event_str = event.to_s
raise ArgumentError, "unknown event #{event.inspect}" unless EVENTS.include?(event_str)
self.callbacks = callbacks.merge(event_str => job_class.to_s)
save!
self
end
|
#pending_jobs ⇒ Object
57
58
59
|
# File 'app/models/sidekiq_batch.rb', line 57
def pending_jobs
sidekiq_batch_jobs.where(status: "pending")
end
|
#progress ⇒ Object
46
47
48
49
50
51
52
53
54
55
|
# File 'app/models/sidekiq_batch.rb', line 46
def progress
counts = sidekiq_batch_jobs.group(:status).count
{
total: total_jobs,
complete: counts.fetch("complete", 0),
failed: counts.fetch("failed", 0),
pending: counts.fetch("pending", 0)
}
end
|