52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
# File 'lib/acidic_job/upgrade_service.rb', line 52
def self.execute
run_attributes = []
errored_keys = []
::AcidicJob::Key.find_each do |key|
attributes = {
id: key.id,
staged: false,
idempotency_key: key.idempotency_key,
job_class: key.job_name,
last_run_at: key.last_run_at,
locked_at: key.locked_at,
recovery_point: key.recovery_point,
error_object: key.error_object,
attr_accessors: key.attr_accessors,
workflow: key.workflow,
created_at: key.created_at,
updated_at: key.updated_at
}
job_class = key.job_name.constantize
if defined?(::Sidekiq) && job_class.include?(::Sidekiq::Worker)
unless job_class.include?(::AcidicJob::Extensions::Sidekiq)
job_class.include(::AcidicJob::Extensions::Sidekiq)
end
job_instance = job_class.new
serialized_job = job_instance.serialize_job(*key.job_args)
elsif defined?(::ActiveJob) && job_class < ::ActiveJob::Base
unless job_class.include?(::AcidicJob::Extensions::ActiveJob)
job_class.include(::AcidicJob::Extensions::ActiveJob)
end
job_args = begin
::ActiveJob::Arguments.deserialize(key.job_args)
rescue ::ActiveJob::DeserializationError
key.job_args
end
job_instance = job_class.new(*job_args)
serialized_job = job_instance.serialize_job
end
attributes[:serialized_job] = serialized_job
run_attributes << attributes
rescue StandardError => e
errored_keys << [e, key]
end
::AcidicJob::Run.insert_all(run_attributes)
::AcidicJob::Key.where(id: ::AcidicJob::Run.select(:id)).delete_all
{
run_records: ::AcidicJob::Run.count,
key_records: ::AcidicJob::Key.count,
errored_keys: errored_keys
}
end
|