14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
# File 'lib/acidic_job/workflow.rb', line 14
def execute_workflow(unique_by:, &block)
serialized_job = serialize
workflow_definition = AcidicJob.instrument(:define_workflow, **serialized_job) do
raise RedefiningWorkflowError if defined? @_builder
@_builder = Builder.new
raise UndefinedWorkflowBlockError unless block_given?
raise InvalidWorkflowBlockError if block.arity != 1
block.call @_builder
raise MissingStepsError if @_builder.steps.empty?
@_builder.define_workflow
end
AcidicJob.instrument(:initialize_workflow, "definition" => workflow_definition) do
transaction_args = case ::ActiveRecord::Base.connection.adapter_name.downcase.to_sym
when :sqlite
{}
else
{ isolation: :serializable }
end
idempotency_key = Digest::SHA256.hexdigest(JSON.dump([self.class.name, unique_by]))
@execution = ::ActiveRecord::Base.transaction(**transaction_args) do
record = Execution.find_by(idempotency_key: idempotency_key)
if record.present?
if record.raw_arguments != serialized_job["arguments"]
raise ArgumentMismatchError.new(serialized_job["arguments"], record.raw_arguments)
end
if record.definition != workflow_definition
raise DefinitionMismatchError.new(workflow_definition, record.definition)
end
record.update!(
last_run_at: Time.current
)
else
record = Execution.create!(
idempotency_key: idempotency_key,
serialized_job: serialized_job,
definition: workflow_definition,
recover_to: workflow_definition.keys.first
)
end
record
end
end
@ctx ||= Context.new(@execution)
AcidicJob.instrument(:process_workflow, execution: @execution.attributes) do
return true if @execution.finished?
loop do
break if @execution.finished?
current_step = @execution.recover_to
if not @execution.definition.key?(current_step) raise UndefinedStepError.new(current_step)
end
step_definition = @execution.definition[current_step]
AcidicJob.instrument(:process_step, **step_definition) do
recover_to = catch(:halt) { take_step(step_definition) }
case recover_to
when HALT_STEP
@execution.record!(step: step_definition.fetch("does"), action: :halted, timestamp: Time.now)
return true
else
@execution.update!(recover_to: recover_to)
end
end
end
end
end
|