Class: RigidWorkflow::Run

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
app/models/rigid_workflow/run.rb

Overview

Represents an execution of a workflow.

Constant Summary collapse

OPEN_STATES =
%w[pending running compensating].freeze

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.bulk_apply(ids, action) ⇒ Object



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'app/models/rigid_workflow/run.rb', line 164

def self.bulk_apply(ids, action)
  runs = where(id: ids)
  results = { success: [], failed: [] }

  runs.each do |run|
    begin
      run.send("#{action}!")
      results[:success] << run.id
    rescue StandardError => error
      results[:failed] << { id: run.id, error: error.message }
    end
  end

  results
end

.filter_runs(statuses = nil, workflow_class: nil) ⇒ Object



193
194
195
196
197
198
199
200
201
# File 'app/models/rigid_workflow/run.rb', line 193

def self.filter_runs(statuses = nil, workflow_class: nil)
  relation = self.where(created_at: 1.month.ago..)
  relation = relation.where(status: statuses) if statuses.present?
  relation =
    relation.where(
      workflow_class: workflow_class
    ) if workflow_class.present?
  relation.order(created_at: :desc)
end

Instance Method Details

#cancel!Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'app/models/rigid_workflow/run.rb', line 132

def cancel!
  raise "Workflow is already finished" if finished?

  transaction do
    self.status = :failed
    save!(context: :state_change)
    # ActiveJob::Base.limits_concurrency ensures only one ActivityJob
    # runs per step_id at a time, so race condition on attempt
    # creation is not possible for a single step.
    open_steps = steps.where(status: OPEN_STATES).includes(:attempts)
    open_step_ids = open_steps.pluck(:id)
    open_steps.each do |step|
      step.attempts.last&.update!(
        status: :failed,
        error_details: {
          "message" => "canceled"
        }
      )
      step.update!(status: :failed)
    end
    open_step_ids.each do |step_id|
      RigidWorkflow.instrument(
        "step.canceled",
        run_id: id,
        step_id: step_id,
        reason: "canceled"
      )
    end
    cleanup_signals!
  end
end

#cleanup_signals!Object



180
181
182
183
184
185
186
# File 'app/models/rigid_workflow/run.rb', line 180

def cleanup_signals!
  now = Time.current
  signals
    .where(received_at: nil, canceled_at: nil)
    .where("expires_at IS NULL OR expires_at > ?", now)
    .update_all(canceled_at: now)
end

#compensate!Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'app/models/rigid_workflow/run.rb', line 88

def compensate!
  # Set status to compensating BEFORE transaction so it persists on failure
  update!(status: :compensating) unless compensating?

  transaction do
    # Find completed steps in reverse order (last completed = first to compensate)
    steps
      .where(status: :completed)
      .order(id: :desc)
      .find_each do |step|
        activity_class = step.activity_class.constantize
        activity = activity_class.new(step)
        activity.compensate
        step.update!(status: :compensated)
      end
  end

  # If we get here, compensation succeeded
  update!(status: :failed)
rescue => error
  # If compensation fails mid-way, run stays :compensating (NOT failed)
  raise error
end

#durationObject



51
52
53
54
55
# File 'app/models/rigid_workflow/run.rb', line 51

def duration
  return nil if started_at.nil? || finished_at.nil?

  finished_at - started_at
end

#emit_signal(name, payload = {}) ⇒ Object



188
189
190
191
# File 'app/models/rigid_workflow/run.rb', line 188

def emit_signal(name, payload = {})
  signal = signals.where(name: name).first!
  RigidWorkflow::Signal.process!(signal, payload)
end

#fail!Object



80
81
82
83
84
85
86
# File 'app/models/rigid_workflow/run.rb', line 80

def fail!
  RigidWorkflow.instrument("workflow.fail", run_id: id) do
    self.status = :failed
    save!(context: :state_change)
    cleanup_signals!
  end
end

#finish!Object



71
72
73
74
75
76
77
78
# File 'app/models/rigid_workflow/run.rb', line 71

def finish!
  RigidWorkflow.instrument("workflow.complete", run_id: id) do
    self.status = :completed
    self.finished_at = Time.current
    save!(context: :state_change)
    cleanup_signals!
  end
end

#finished?Boolean

Returns:

  • (Boolean)


47
48
49
# File 'app/models/rigid_workflow/run.rb', line 47

def finished?
  completed? || failed?
end

#memoryObject



20
21
22
# File 'app/models/rigid_workflow/run.rb', line 20

def memory
  super || {}
end

#memory=(value) ⇒ Object



24
25
26
27
# File 'app/models/rigid_workflow/run.rb', line 24

def memory=(value)
  raise "Immutable once finished" if finished?
  super(value)
end

#read_only?Boolean

Returns:

  • (Boolean)


29
30
31
# File 'app/models/rigid_workflow/run.rb', line 29

def read_only?
  completed?
end

#retry!Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'app/models/rigid_workflow/run.rb', line 112

def retry!
  raise "Workflow cannot be retried when #{status}" unless failed? || running?

  transaction do
    self.status = :running
    save!(context: :state_change)

    failed_steps = steps.where(status: :failed)
    failed_steps.each do |step|
      step.update!(status: :pending)
    end
    failed_step_ids = failed_steps.pluck(:id)
    failed_step_ids.each do |step_id|
      RigidWorkflow.instrument("step.retry", run_id: id, step_id: step_id)
    end

    Orchestrator.schedule(self)
  end
end

#start!Object



65
66
67
68
69
# File 'app/models/rigid_workflow/run.rb', line 65

def start!
  self.status = :running
  self.started_at = Time.current
  save!(context: :state_change)
end