Class: RigidWorkflow::Run
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- RigidWorkflow::Run
- Defined in:
- app/models/rigid_workflow/run.rb
Overview
Represents an execution of a workflow.
Constant Summary collapse
- OPEN_STATES =
%w[pending running compensating].freeze
Class Method Summary collapse
Instance Method Summary collapse
- #cancel! ⇒ Object
- #cleanup_signals! ⇒ Object
- #compensate! ⇒ Object
- #duration ⇒ Object
- #emit_signal(name, payload = {}) ⇒ Object
- #fail! ⇒ Object
- #finish! ⇒ Object
- #finished? ⇒ Boolean
- #memory ⇒ Object
- #memory=(value) ⇒ Object
- #read_only? ⇒ Boolean
- #retry! ⇒ Object
- #start! ⇒ Object
Class Method Details
.bulk_apply(ids, action) ⇒ Object
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'app/models/rigid_workflow/run.rb', line 164 def self.bulk_apply(ids, action) runs = where(id: ids) results = { success: [], failed: [] } runs.each do |run| begin run.send("#{action}!") results[:success] << run.id rescue StandardError => error results[:failed] << { id: run.id, error: error. } end end results end |
.filter_runs(statuses = nil, workflow_class: nil) ⇒ Object
193 194 195 196 197 198 199 200 201 |
# File 'app/models/rigid_workflow/run.rb', line 193 def self.filter_runs(statuses = nil, workflow_class: nil) relation = self.where(created_at: 1.month.ago..) relation = relation.where(status: statuses) if statuses.present? relation = relation.where( workflow_class: workflow_class ) if workflow_class.present? relation.order(created_at: :desc) end |
Instance Method Details
#cancel! ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'app/models/rigid_workflow/run.rb', line 132 def cancel! raise "Workflow is already finished" if finished? transaction do self.status = :failed save!(context: :state_change) # ActiveJob::Base.limits_concurrency ensures only one ActivityJob # runs per step_id at a time, so race condition on attempt # creation is not possible for a single step. open_steps = steps.where(status: OPEN_STATES).includes(:attempts) open_step_ids = open_steps.pluck(:id) open_steps.each do |step| step.attempts.last&.update!( status: :failed, error_details: { "message" => "canceled" } ) step.update!(status: :failed) end open_step_ids.each do |step_id| RigidWorkflow.instrument( "step.canceled", run_id: id, step_id: step_id, reason: "canceled" ) end cleanup_signals! end end |
#cleanup_signals! ⇒ Object
180 181 182 183 184 185 186 |
# File 'app/models/rigid_workflow/run.rb', line 180 def cleanup_signals! now = Time.current signals .where(received_at: nil, canceled_at: nil) .where("expires_at IS NULL OR expires_at > ?", now) .update_all(canceled_at: now) end |
#compensate! ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'app/models/rigid_workflow/run.rb', line 88 def compensate! # Set status to compensating BEFORE transaction so it persists on failure update!(status: :compensating) unless compensating? transaction do # Find completed steps in reverse order (last completed = first to compensate) steps .where(status: :completed) .order(id: :desc) .find_each do |step| activity_class = step.activity_class.constantize activity = activity_class.new(step) activity.compensate step.update!(status: :compensated) end end # If we get here, compensation succeeded update!(status: :failed) rescue => error # If compensation fails mid-way, run stays :compensating (NOT failed) raise error end |
#duration ⇒ Object
51 52 53 54 55 |
# File 'app/models/rigid_workflow/run.rb', line 51 def duration return nil if started_at.nil? || finished_at.nil? finished_at - started_at end |
#emit_signal(name, payload = {}) ⇒ Object
188 189 190 191 |
# File 'app/models/rigid_workflow/run.rb', line 188 def emit_signal(name, payload = {}) signal = signals.where(name: name).first! RigidWorkflow::Signal.process!(signal, payload) end |
#fail! ⇒ Object
80 81 82 83 84 85 86 |
# File 'app/models/rigid_workflow/run.rb', line 80 def fail! RigidWorkflow.instrument("workflow.fail", run_id: id) do self.status = :failed save!(context: :state_change) cleanup_signals! end end |
#finish! ⇒ Object
71 72 73 74 75 76 77 78 |
# File 'app/models/rigid_workflow/run.rb', line 71 def finish! RigidWorkflow.instrument("workflow.complete", run_id: id) do self.status = :completed self.finished_at = Time.current save!(context: :state_change) cleanup_signals! end end |
#finished? ⇒ Boolean
47 48 49 |
# File 'app/models/rigid_workflow/run.rb', line 47 def finished? completed? || failed? end |
#memory ⇒ Object
20 21 22 |
# File 'app/models/rigid_workflow/run.rb', line 20 def memory super || {} end |
#memory=(value) ⇒ Object
24 25 26 27 |
# File 'app/models/rigid_workflow/run.rb', line 24 def memory=(value) raise "Immutable once finished" if finished? super(value) end |
#read_only? ⇒ Boolean
29 30 31 |
# File 'app/models/rigid_workflow/run.rb', line 29 def read_only? completed? end |
#retry! ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'app/models/rigid_workflow/run.rb', line 112 def retry! raise "Workflow cannot be retried when #{status}" unless failed? || running? transaction do self.status = :running save!(context: :state_change) failed_steps = steps.where(status: :failed) failed_steps.each do |step| step.update!(status: :pending) end failed_step_ids = failed_steps.pluck(:id) failed_step_ids.each do |step_id| RigidWorkflow.instrument("step.retry", run_id: id, step_id: step_id) end Orchestrator.schedule(self) end end |
#start! ⇒ Object
65 66 67 68 69 |
# File 'app/models/rigid_workflow/run.rb', line 65 def start! self.status = :running self.started_at = Time.current save!(context: :state_change) end |