Module: AcidicJob::Workflow

Defined in:
lib/acidic_job/workflow.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#ctxObject (readonly)

Returns the value of attribute ctx.



12
13
14
# File 'lib/acidic_job/workflow.rb', line 12

def ctx
  @ctx
end

#executionObject (readonly)

Returns the value of attribute execution.



12
13
14
# File 'lib/acidic_job/workflow.rb', line 12

def execution
  @execution
end

Instance Method Details

#execute_workflow(unique_by:, &block) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/acidic_job/workflow.rb', line 14

def execute_workflow(unique_by:, &block)
  serialized_job = serialize

  workflow_definition = AcidicJob.instrument(:define_workflow, **serialized_job) do
    raise RedefiningWorkflowError if defined? @_builder

    @_builder = Builder.new

    raise UndefinedWorkflowBlockError unless block_given?
    raise InvalidWorkflowBlockError if block.arity != 1

    block.call @_builder

    raise MissingStepsError if @_builder.steps.empty?

    # convert the array of steps into a hash of recovery_points and next steps
    @_builder.define_workflow
  end

  AcidicJob.instrument(:initialize_workflow, "definition" => workflow_definition) do
    transaction_args = case ::ActiveRecord::Base.connection.adapter_name.downcase.to_sym
                       # SQLite doesn't support `serializable` transactions
                       when :sqlite
                         {}
                       else
                         { isolation: :serializable }
                       end
    idempotency_key = Digest::SHA256.hexdigest(JSON.dump([self.class.name, unique_by]))

    @execution = ::ActiveRecord::Base.transaction(**transaction_args) do
      record = Execution.find_by(idempotency_key: idempotency_key)

      if record.present?
        # Programs enqueuing multiple jobs with different parameters but the
        # same idempotency key is a bug.
        if record.raw_arguments != serialized_job["arguments"]
          raise ArgumentMismatchError.new(serialized_job["arguments"], record.raw_arguments)
        end

        if record.definition != workflow_definition
          raise DefinitionMismatchError.new(workflow_definition, record.definition)
        end

        # Only acquire a lock if the key is unlocked or its lock has expired
        # because the original job was long enough ago.
        # raise "LockedIdempotencyKey" if record.locked_at > Time.current - 2.seconds

        record.update!(
          last_run_at: Time.current
        )
      else
        record = Execution.create!(
          idempotency_key: idempotency_key,
          serialized_job: serialized_job,
          definition: workflow_definition,
          recover_to: workflow_definition.keys.first
        )
      end

      record
    end
  end
  @ctx ||= Context.new(@execution)

  AcidicJob.instrument(:process_workflow, execution: @execution.attributes) do
    # if the workflow record is already marked as finished, immediately return its result
    return true if @execution.finished?

    loop do
      break if @execution.finished?

      current_step = @execution.recover_to

      if not @execution.definition.key?(current_step) # rubocop:disable Style/Not
        raise UndefinedStepError.new(current_step)
      end

      step_definition = @execution.definition[current_step]
      AcidicJob.instrument(:process_step, **step_definition) do
        recover_to = catch(:halt) { take_step(step_definition) }
        case recover_to
        when HALT_STEP
          @execution.record!(step: step_definition.fetch("does"), action: :halted, timestamp: Time.now)
          return true
        else
          @execution.update!(recover_to: recover_to)
        end
      end
    end
  end
end

#halt_step!Object



110
111
112
# File 'lib/acidic_job/workflow.rb', line 110

def halt_step!
  throw :halt, HALT_STEP
end

#repeat_step!Object



106
107
108
# File 'lib/acidic_job/workflow.rb', line 106

def repeat_step!
  throw :repeat, REPEAT_STEP
end

#step_retrying?Boolean

Returns:

  • (Boolean)


114
115
116
117
118
119
120
121
122
# File 'lib/acidic_job/workflow.rb', line 114

def step_retrying?
  step_name = caller_locations.first.label

  if not @execution.definition.key?(step_name) # rubocop:disable Style/IfUnlessModifier, Style/Not
    raise UndefinedStepError.new(step_name)
  end

  @execution.entries.where(step: step_name, action: "started").count > 1
end