Class: DurableFlow::Workflow

Inherits:
ActiveJob::Base
  • Object
show all
Includes:
ActiveJob::Continuable
Defined in:
lib/durable_flow/workflow.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#workflow_runObject (readonly)

Returns the value of attribute workflow_run.



11
12
13
# File 'lib/durable_flow/workflow.rb', line 11

def workflow_run
  @workflow_run
end

Instance Method Details

#checkpoint!Object



13
14
15
16
# File 'lib/durable_flow/workflow.rb', line 13

def checkpoint!
  refresh_execution_lock!
  interrupt!(reason: :stopping) if queue_adapter.respond_to?(:stopping?) && queue_adapter.stopping?
end

#continuable_stepObject



9
# File 'lib/durable_flow/workflow.rb', line 9

alias_method :continuable_step, :step

#logObject



76
77
78
# File 'lib/durable_flow/workflow.rb', line 76

def log
  @workflow_logger ||= WorkflowLogger.new(self)
end

#sleep_step(name, duration = nil, until_time: nil) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/durable_flow/workflow.rb', line 24

def sleep_step(name, duration = nil, until_time: nil)
  durable_step(name) do
    step_record = current_workflow_step
     = step_record.
    wake_at = parse_time(["wake_at"]) || time_from(duration, explicit_time: until_time)

    raise ArgumentError, "Provide a duration or until: time for sleep step #{name.inspect}" unless wake_at

    if Time.current < wake_at
      ["wake_at"] = wake_at.utc.iso8601(9)
      step_record.update!(status: "sleeping", metadata: )
      pause_or_interrupt!(reason: :sleeping, status: "sleeping", resume_options: { wait_until: wake_at })
    end

    nil
  end
end

#step(name = nil, start: nil, isolated: false, &block) ⇒ Object



18
19
20
21
22
# File 'lib/durable_flow/workflow.rb', line 18

def step(name = nil, start: nil, isolated: false, &block)
  return StepProxy.new(self) if name.nil?

  durable_step(name, start: start, isolated: isolated, &block)
end

#wait_for_event_step(name, event_name:, timeout:, match:) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/durable_flow/workflow.rb', line 42

def wait_for_event_step(name, event_name:, timeout:, match:)
  durable_step(name) do
    step_record = current_workflow_step
    wait = find_or_initialize_wait(step_record, event_name: event_name, timeout: timeout, match: match)

    if (event = matched_event_for(wait))
      wait.update!(status: "matched", workflow_event: event)
      event.payload_value
    elsif wait.timeout_at && Time.current >= wait.timeout_at
      wait.update!(status: "timed_out")
      step_record.update!(status: "failed", metadata: step_record..merge("timeout_at" => wait.timeout_at.utc.iso8601(9)))
      raise WaitTimeoutError.new(event_name: event_name.to_s, step_name: name.to_s)
    else
      step_record.update!(
        status: "waiting",
        metadata: step_record..merge(
          "event_name" => event_name.to_s,
          "timeout_at" => wait.timeout_at&.utc&.iso8601(9),
        ).compact,
      )

      if wait.timeout_at
        pause_or_interrupt!(reason: :waiting, status: "waiting", resume_options: { wait_until: wait.timeout_at })
      else
        pause_or_interrupt!(reason: :waiting, status: "waiting")
      end
    end
  end
end

#wait_for_workflow(name, workflow_or_run_id, timeout: nil) ⇒ Object



72
73
74
# File 'lib/durable_flow/workflow.rb', line 72

def wait_for_workflow(name, workflow_or_run_id, timeout: nil)
  StepProxy.new(self).wait_for_workflow(name, workflow_or_run_id, timeout: timeout)
end