Class: DurableFlow::Workflow
- Inherits:
-
ActiveJob::Base
- Object
- ActiveJob::Base
- DurableFlow::Workflow
- Includes:
- ActiveJob::Continuable
- Defined in:
- lib/durable_flow/workflow.rb
Instance Attribute Summary collapse
-
#workflow_run ⇒ Object
readonly
Returns the value of attribute workflow_run.
Instance Method Summary collapse
- #checkpoint! ⇒ Object
- #continuable_step ⇒ Object
- #log ⇒ Object
- #sleep_step(name, duration = nil, until_time: nil) ⇒ Object
- #step(name = nil, start: nil, isolated: false, &block) ⇒ Object
- #wait_for_event_step(name, event_name:, timeout:, match:) ⇒ Object
- #wait_for_workflow(name, workflow_or_run_id, timeout: nil) ⇒ Object
Instance Attribute Details
#workflow_run ⇒ Object (readonly)
Returns the value of attribute workflow_run.
11 12 13 |
# File 'lib/durable_flow/workflow.rb', line 11 def workflow_run @workflow_run end |
Instance Method Details
#checkpoint! ⇒ Object
13 14 15 16 |
# File 'lib/durable_flow/workflow.rb', line 13 def checkpoint! refresh_execution_lock! interrupt!(reason: :stopping) if queue_adapter.respond_to?(:stopping?) && queue_adapter.stopping? end |
#continuable_step ⇒ Object
9 |
# File 'lib/durable_flow/workflow.rb', line 9 alias_method :continuable_step, :step |
#log ⇒ Object
76 77 78 |
# File 'lib/durable_flow/workflow.rb', line 76 def log @workflow_logger ||= WorkflowLogger.new(self) end |
#sleep_step(name, duration = nil, until_time: nil) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/durable_flow/workflow.rb', line 24 def sleep_step(name, duration = nil, until_time: nil) durable_step(name) do step_record = current_workflow_step = step_record. wake_at = parse_time(["wake_at"]) || time_from(duration, explicit_time: until_time) raise ArgumentError, "Provide a duration or until: time for sleep step #{name.inspect}" unless wake_at if Time.current < wake_at ["wake_at"] = wake_at.utc.iso8601(9) step_record.update!(status: "sleeping", metadata: ) pause_or_interrupt!(reason: :sleeping, status: "sleeping", resume_options: { wait_until: wake_at }) end nil end end |
#step(name = nil, start: nil, isolated: false, &block) ⇒ Object
18 19 20 21 22 |
# File 'lib/durable_flow/workflow.rb', line 18 def step(name = nil, start: nil, isolated: false, &block) return StepProxy.new(self) if name.nil? durable_step(name, start: start, isolated: isolated, &block) end |
#wait_for_event_step(name, event_name:, timeout:, match:) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/durable_flow/workflow.rb', line 42 def wait_for_event_step(name, event_name:, timeout:, match:) durable_step(name) do step_record = current_workflow_step wait = find_or_initialize_wait(step_record, event_name: event_name, timeout: timeout, match: match) if (event = matched_event_for(wait)) wait.update!(status: "matched", workflow_event: event) event.payload_value elsif wait.timeout_at && Time.current >= wait.timeout_at wait.update!(status: "timed_out") step_record.update!(status: "failed", metadata: step_record..merge("timeout_at" => wait.timeout_at.utc.iso8601(9))) raise WaitTimeoutError.new(event_name: event_name.to_s, step_name: name.to_s) else step_record.update!( status: "waiting", metadata: step_record..merge( "event_name" => event_name.to_s, "timeout_at" => wait.timeout_at&.utc&.iso8601(9), ).compact, ) if wait.timeout_at pause_or_interrupt!(reason: :waiting, status: "waiting", resume_options: { wait_until: wait.timeout_at }) else pause_or_interrupt!(reason: :waiting, status: "waiting") end end end end |
#wait_for_workflow(name, workflow_or_run_id, timeout: nil) ⇒ Object
72 73 74 |
# File 'lib/durable_flow/workflow.rb', line 72 def wait_for_workflow(name, workflow_or_run_id, timeout: nil) StepProxy.new(self).wait_for_workflow(name, workflow_or_run_id, timeout: timeout) end |