Module: AcidicJob::UpgradeService

Defined in:
lib/acidic_job/upgrade_service.rb

Class Method Summary collapse

Class Method Details

.executeObject

[View source]

52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/acidic_job/upgrade_service.rb', line 52

def self.execute
  # prepare an array to hold the attribute hashes to be passed to `insert_all`
  run_attributes = []
  # prepare an array to hold any `Key` records that we couldn't successfully map to `Run` records
  errored_keys = []

  # iterate over all `AcidicJob::Key` records in batches,
  # preparing a `Run` attribute hash to be passed to `insert_all`
  ::AcidicJob::Key.find_each do |key|
    # map all of the simple attributes directly
    attributes = {
      id: key.id,
      staged: false,
      idempotency_key: key.idempotency_key,
      job_class: key.job_name,
      last_run_at: key.last_run_at,
      locked_at: key.locked_at,
      recovery_point: key.recovery_point,
      error_object: key.error_object,
      attr_accessors: key.attr_accessors,
      workflow: key.workflow,
      created_at: key.created_at,
      updated_at: key.updated_at
    }

    # prepare the more complicated `job_args` -> `serialized_job` translation
    job_class = key.job_name.constantize
    if defined?(::Sidekiq) && job_class.include?(::Sidekiq::Worker)
      unless job_class.include?(::AcidicJob::Extensions::Sidekiq)
        job_class.include(::AcidicJob::Extensions::Sidekiq)
      end
      job_instance = job_class.new
      serialized_job = job_instance.serialize_job(*key.job_args)
    elsif defined?(::ActiveJob) && job_class < ::ActiveJob::Base
      unless job_class.include?(::AcidicJob::Extensions::ActiveJob)
        job_class.include(::AcidicJob::Extensions::ActiveJob)
      end
      job_args = begin
        ::ActiveJob::Arguments.deserialize(key.job_args)
      rescue ::ActiveJob::DeserializationError
        key.job_args
      end
      job_instance = job_class.new(*job_args)
      serialized_job = job_instance.serialize_job
    end

    attributes[:serialized_job] = serialized_job
    run_attributes << attributes
  rescue StandardError => e
    errored_keys << [e, key]
  end

  # insert all of the `Run` records
  ::AcidicJob::Run.insert_all(run_attributes)

  # delete all successfully migrated `Key` record
  ::AcidicJob::Key.where(id: ::AcidicJob::Run.select(:id)).delete_all

  # return a report of the upgrade migration
  {
    run_records: ::AcidicJob::Run.count,
    key_records: ::AcidicJob::Key.count,
    errored_keys: errored_keys
  }
end