Module: DurableFlow::TestHelper
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/durable_flow/test_helper.rb
Instance Method Summary collapse
- #assert_durable_flow_change(changes, type, **payload) ⇒ Object
- #assert_step_attempts(workflow_or_run, name, expected) ⇒ Object
- #assert_step_log(workflow_or_run, step_name, level: nil, message: nil, data: nil) ⇒ Object
- #assert_step_result(workflow_or_run, name, expected) ⇒ Object
- #assert_step_succeeded(workflow_or_run, name) ⇒ Object
- #assert_workflow_completed(workflow_or_run) ⇒ Object
- #assert_workflow_failed(workflow_or_run, error: nil) ⇒ Object
- #assert_workflow_log(workflow_or_run, level: nil, message: nil, data: nil) ⇒ Object
- #assert_workflow_sleeping(workflow_or_run, step: nil) ⇒ Object
- #assert_workflow_waiting_for(workflow_or_run, event_name, match: nil) ⇒ Object
- #capture_durable_flow_changes ⇒ Object
- #clear_durable_flow!(clear_jobs: true, reset_live: true) ⇒ Object
- #durable_flow_run(run_id) ⇒ Object
- #durable_flow_run_for(workflow_class) ⇒ Object
- #durable_flow_step(workflow_or_run, name) ⇒ Object
- #durable_flow_timeline_for(workflow_or_run) ⇒ Object
- #next_workflow_wake_at(workflow_or_run = nil) ⇒ Object
- #notify_workflow_event(name, **payload) ⇒ Object
- #perform_durable_flow_jobs(**options, &block) ⇒ Object
- #resume_workflows_for(name, **payload) ⇒ Object
- #travel_to_next_workflow_wake(workflow_or_run = nil) ⇒ Object
Instance Method Details
#assert_durable_flow_change(changes, type, **payload) ⇒ Object
152 153 154 155 156 157 158 159 |
# File 'lib/durable_flow/test_helper.rb', line 152 def assert_durable_flow_change(changes, type, **payload) change = changes.find do |candidate| candidate.type == type && payload.all? { |key, value| durable_flow_change_value(candidate, key) == value } end assert change, "Expected DurableFlow change #{type.inspect} with #{payload.inspect}, got #{changes.map(&:payload).inspect}" change end |
#assert_step_attempts(workflow_or_run, name, expected) ⇒ Object
110 111 112 113 114 115 |
# File 'lib/durable_flow/test_helper.rb', line 110 def assert_step_attempts(workflow_or_run, name, expected) step = durable_flow_step(workflow_or_run, name) assert_equal expected, step.attempts step end |
#assert_step_log(workflow_or_run, step_name, level: nil, message: nil, data: nil) ⇒ Object
122 123 124 125 |
# File 'lib/durable_flow/test_helper.rb', line 122 def assert_step_log(workflow_or_run, step_name, level: nil, message: nil, data: nil) step = durable_flow_step(workflow_or_run, step_name) assert_log_in(step.workflow_logs.ordered, level: level, message: , data: data) end |
#assert_step_result(workflow_or_run, name, expected) ⇒ Object
103 104 105 106 107 108 |
# File 'lib/durable_flow/test_helper.rb', line 103 def assert_step_result(workflow_or_run, name, expected) step = assert_step_succeeded(workflow_or_run, name) assert_equal expected, step.result_value step end |
#assert_step_succeeded(workflow_or_run, name) ⇒ Object
96 97 98 99 100 101 |
# File 'lib/durable_flow/test_helper.rb', line 96 def assert_step_succeeded(workflow_or_run, name) step = durable_flow_step(workflow_or_run, name) assert_equal "succeeded", step.status step end |
#assert_workflow_completed(workflow_or_run) ⇒ Object
63 64 65 |
# File 'lib/durable_flow/test_helper.rb', line 63 def assert_workflow_completed(workflow_or_run) assert_workflow_status(workflow_or_run, "completed") end |
#assert_workflow_failed(workflow_or_run, error: nil) ⇒ Object
67 68 69 70 71 72 73 74 75 76 |
# File 'lib/durable_flow/test_helper.rb', line 67 def assert_workflow_failed(workflow_or_run, error: nil) run = assert_workflow_status(workflow_or_run, "failed") if error expected_class = error.respond_to?(:name) ? error.name : error.to_s assert_equal expected_class, run.last_error&.fetch("class", nil) end run end |
#assert_workflow_log(workflow_or_run, level: nil, message: nil, data: nil) ⇒ Object
117 118 119 120 |
# File 'lib/durable_flow/test_helper.rb', line 117 def assert_workflow_log(workflow_or_run, level: nil, message: nil, data: nil) run = durable_flow_resolve_run(workflow_or_run) assert_log_in(run.workflow_logs.ordered, level: level, message: , data: data) end |
#assert_workflow_sleeping(workflow_or_run, step: nil) ⇒ Object
78 79 80 81 82 83 84 85 |
# File 'lib/durable_flow/test_helper.rb', line 78 def assert_workflow_sleeping(workflow_or_run, step: nil) run = assert_workflow_status(workflow_or_run, "sleeping") sleep_step = step ? durable_flow_step(run, step) : run.workflow_steps.find_by!(status: "sleeping") assert_equal "sleeping", sleep_step.status assert sleep_step.["wake_at"].present?, "Expected sleep step #{sleep_step.name.inspect} to store wake_at metadata" sleep_step end |
#assert_workflow_waiting_for(workflow_or_run, event_name, match: nil) ⇒ Object
87 88 89 90 91 92 93 94 |
# File 'lib/durable_flow/test_helper.rb', line 87 def assert_workflow_waiting_for(workflow_or_run, event_name, match: nil) run = assert_workflow_status(workflow_or_run, "waiting") wait = run.workflow_waits.find_by!(event_name: event_name.to_s) assert_equal "pending", wait.status assert_equal match, wait.match_value if match wait end |
#capture_durable_flow_changes ⇒ Object
142 143 144 145 146 147 148 149 150 |
# File 'lib/durable_flow/test_helper.rb', line 142 def capture_durable_flow_changes changes = [] subscriber = DurableFlow.on_change { |change| changes << change } yield changes changes ensure DurableFlow.unsubscribe_from_changes(subscriber) if subscriber end |
#clear_durable_flow!(clear_jobs: true, reset_live: true) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/durable_flow/test_helper.rb', line 18 def clear_durable_flow!(clear_jobs: true, reset_live: true) DurableFlow.reset_live_broadcasters! if reset_live send(:clear_enqueued_jobs) if clear_jobs && respond_to?(:clear_enqueued_jobs, true) send(:clear_performed_jobs) if clear_jobs && respond_to?(:clear_performed_jobs, true) WorkflowWait.delete_all WorkflowEvent.delete_all WorkflowLog.delete_all WorkflowStep.delete_all WorkflowRun.delete_all end |
#durable_flow_run(run_id) ⇒ Object
30 31 32 |
# File 'lib/durable_flow/test_helper.rb', line 30 def durable_flow_run(run_id) WorkflowRun.find_by!(run_id: run_id.to_s) end |
#durable_flow_run_for(workflow_class) ⇒ Object
34 35 36 37 |
# File 'lib/durable_flow/test_helper.rb', line 34 def durable_flow_run_for(workflow_class) WorkflowRun.where(workflow_class: workflow_class_name(workflow_class)).order(:created_at, :id).last || flunk("Expected a DurableFlow run for #{workflow_class_name(workflow_class)}") end |
#durable_flow_step(workflow_or_run, name) ⇒ Object
43 44 45 46 |
# File 'lib/durable_flow/test_helper.rb', line 43 def durable_flow_step(workflow_or_run, name) run = durable_flow_resolve_run(workflow_or_run) run.workflow_steps.find_by!(name: name.to_s) end |
#durable_flow_timeline_for(workflow_or_run) ⇒ Object
39 40 41 |
# File 'lib/durable_flow/test_helper.rb', line 39 def durable_flow_timeline_for(workflow_or_run) durable_flow_resolve_run(workflow_or_run).timeline end |
#next_workflow_wake_at(workflow_or_run = nil) ⇒ Object
135 136 137 138 139 140 |
# File 'lib/durable_flow/test_helper.rb', line 135 def next_workflow_wake_at(workflow_or_run = nil) scope = WorkflowStep.where(status: "sleeping") scope = scope.where(workflow_run: durable_flow_resolve_run(workflow_or_run)) if workflow_or_run scope.filter_map { |step| parse_workflow_wake_at(step.["wake_at"]) }.min end |
#notify_workflow_event(name, **payload) ⇒ Object
54 55 56 |
# File 'lib/durable_flow/test_helper.rb', line 54 def notify_workflow_event(name, **payload) payload.empty? ? DurableFlow.notify(name) : DurableFlow.notify(name, payload) end |
#perform_durable_flow_jobs(**options, &block) ⇒ Object
48 49 50 51 52 |
# File 'lib/durable_flow/test_helper.rb', line 48 def perform_durable_flow_jobs(**, &block) raise "Include ActiveJob::TestHelper to perform DurableFlow jobs" unless respond_to?(:perform_enqueued_jobs) perform_enqueued_jobs(**, &block) end |
#resume_workflows_for(name, **payload) ⇒ Object
58 59 60 61 |
# File 'lib/durable_flow/test_helper.rb', line 58 def resume_workflows_for(name, **payload) notify_workflow_event(name, **payload) perform_durable_flow_jobs(at: Time.current) end |
#travel_to_next_workflow_wake(workflow_or_run = nil) ⇒ Object
127 128 129 130 131 132 133 |
# File 'lib/durable_flow/test_helper.rb', line 127 def travel_to_next_workflow_wake(workflow_or_run = nil) wake_at = next_workflow_wake_at(workflow_or_run) assert wake_at, "Expected a sleeping DurableFlow step with wake_at metadata" travel_to wake_at wake_at end |