Module: DurableFlow::TestHelper
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/durable_flow/test_helper.rb
Instance Method Summary collapse
- #assert_durable_flow_change(changes, type, **payload) ⇒ Object
- #assert_step_attempts(workflow_or_run, name, expected) ⇒ Object
- #assert_step_log(workflow_or_run, step_name, level: nil, message: nil, data: nil) ⇒ Object
- #assert_step_result(workflow_or_run, name, expected) ⇒ Object
- #assert_step_succeeded(workflow_or_run, name) ⇒ Object
- #assert_workflow_completed(workflow_or_run) ⇒ Object
- #assert_workflow_failed(workflow_or_run, error: nil) ⇒ Object
- #assert_workflow_log(workflow_or_run, level: nil, message: nil, data: nil) ⇒ Object
- #assert_workflow_sleeping(workflow_or_run, step: nil) ⇒ Object
- #assert_workflow_waiting_for(workflow_or_run, event_name, match: nil) ⇒ Object
- #assert_workflow_waiting_for_workflow(workflow_or_run, run_id, step: nil) ⇒ Object
- #capture_durable_flow_changes ⇒ Object
- #clear_durable_flow!(clear_jobs: true, reset_live: true) ⇒ Object
- #durable_flow_run(run_id) ⇒ Object
- #durable_flow_run_for(workflow_class) ⇒ Object
- #durable_flow_step(workflow_or_run, name) ⇒ Object
- #durable_flow_timeline_for(workflow_or_run) ⇒ Object
- #next_workflow_wake_at(workflow_or_run = nil) ⇒ Object
- #notify_workflow_event(name, **payload) ⇒ Object
- #perform_durable_flow_jobs(**options, &block) ⇒ Object
- #perform_durable_flow_until_idle(at: Time.current, limit: 100, **options) ⇒ Object
- #resume_workflows_for(name, **payload) ⇒ Object
- #travel_to_next_workflow_wake(workflow_or_run = nil) ⇒ Object
Instance Method Details
#assert_durable_flow_change(changes, type, **payload) ⇒ Object
183 184 185 186 187 188 189 190 |
# File 'lib/durable_flow/test_helper.rb', line 183 def assert_durable_flow_change(changes, type, **payload) change = changes.find do |candidate| candidate.type == type && payload.all? { |key, value| durable_flow_change_value(candidate, key) == value } end assert change, "Expected DurableFlow change #{type.inspect} with #{payload.inspect}, got #{changes.map(&:payload).inspect}" change end |
#assert_step_attempts(workflow_or_run, name, expected) ⇒ Object
141 142 143 144 145 146 |
# File 'lib/durable_flow/test_helper.rb', line 141 def assert_step_attempts(workflow_or_run, name, expected) step = durable_flow_step(workflow_or_run, name) assert_equal expected, step.attempts step end |
#assert_step_log(workflow_or_run, step_name, level: nil, message: nil, data: nil) ⇒ Object
153 154 155 156 |
# File 'lib/durable_flow/test_helper.rb', line 153 def assert_step_log(workflow_or_run, step_name, level: nil, message: nil, data: nil) step = durable_flow_step(workflow_or_run, step_name) assert_log_in(step.workflow_logs.ordered, level: level, message: , data: data) end |
#assert_step_result(workflow_or_run, name, expected) ⇒ Object
134 135 136 137 138 139 |
# File 'lib/durable_flow/test_helper.rb', line 134 def assert_step_result(workflow_or_run, name, expected) step = assert_step_succeeded(workflow_or_run, name) assert_equal expected, step.result_value step end |
#assert_step_succeeded(workflow_or_run, name) ⇒ Object
127 128 129 130 131 132 |
# File 'lib/durable_flow/test_helper.rb', line 127 def assert_step_succeeded(workflow_or_run, name) step = durable_flow_step(workflow_or_run, name) assert_equal "succeeded", step.status step end |
#assert_workflow_completed(workflow_or_run) ⇒ Object
83 84 85 |
# File 'lib/durable_flow/test_helper.rb', line 83 def assert_workflow_completed(workflow_or_run) assert_workflow_status(workflow_or_run, "completed") end |
#assert_workflow_failed(workflow_or_run, error: nil) ⇒ Object
87 88 89 90 91 92 93 94 95 96 |
# File 'lib/durable_flow/test_helper.rb', line 87 def assert_workflow_failed(workflow_or_run, error: nil) run = assert_workflow_status(workflow_or_run, "failed") if error expected_class = error.respond_to?(:name) ? error.name : error.to_s assert_equal expected_class, run.last_error&.fetch("class", nil) end run end |
#assert_workflow_log(workflow_or_run, level: nil, message: nil, data: nil) ⇒ Object
148 149 150 151 |
# File 'lib/durable_flow/test_helper.rb', line 148 def assert_workflow_log(workflow_or_run, level: nil, message: nil, data: nil) run = durable_flow_resolve_run(workflow_or_run) assert_log_in(run.workflow_logs.ordered, level: level, message: , data: data) end |
#assert_workflow_sleeping(workflow_or_run, step: nil) ⇒ Object
98 99 100 101 102 103 104 105 |
# File 'lib/durable_flow/test_helper.rb', line 98 def assert_workflow_sleeping(workflow_or_run, step: nil) run = assert_workflow_status(workflow_or_run, "sleeping") sleep_step = step ? durable_flow_step(run, step) : run.workflow_steps.find_by!(status: "sleeping") assert_equal "sleeping", sleep_step.status assert sleep_step.["wake_at"].present?, "Expected sleep step #{sleep_step.name.inspect} to store wake_at metadata" sleep_step end |
#assert_workflow_waiting_for(workflow_or_run, event_name, match: nil) ⇒ Object
107 108 109 110 111 112 113 114 |
# File 'lib/durable_flow/test_helper.rb', line 107 def assert_workflow_waiting_for(workflow_or_run, event_name, match: nil) run = assert_workflow_status(workflow_or_run, "waiting") wait = run.workflow_waits.find_by!(event_name: event_name.to_s) assert_equal "pending", wait.status assert_equal match, wait.match_value if match wait end |
#assert_workflow_waiting_for_workflow(workflow_or_run, run_id, step: nil) ⇒ Object
116 117 118 119 120 121 122 123 124 125 |
# File 'lib/durable_flow/test_helper.rb', line 116 def assert_workflow_waiting_for_workflow(workflow_or_run, run_id, step: nil) wait = assert_workflow_waiting_for( workflow_or_run, DurableFlow::WORKFLOW_COMPLETED_EVENT, match: { run_id: run_id.to_s }, ) assert_equal step.to_s, wait.workflow_step.name if step wait end |
#capture_durable_flow_changes ⇒ Object
173 174 175 176 177 178 179 180 181 |
# File 'lib/durable_flow/test_helper.rb', line 173 def capture_durable_flow_changes changes = [] subscriber = DurableFlow.on_change { |change| changes << change } yield changes changes ensure DurableFlow.unsubscribe_from_changes(subscriber) if subscriber end |
#clear_durable_flow!(clear_jobs: true, reset_live: true) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/durable_flow/test_helper.rb', line 18 def clear_durable_flow!(clear_jobs: true, reset_live: true) DurableFlow.reset_live_broadcasters! if reset_live send(:clear_enqueued_jobs) if clear_jobs && respond_to?(:clear_enqueued_jobs, true) send(:clear_performed_jobs) if clear_jobs && respond_to?(:clear_performed_jobs, true) WorkflowWait.delete_all WorkflowEvent.delete_all WorkflowLog.delete_all WorkflowStep.delete_all WorkflowRun.delete_all end |
#durable_flow_run(run_id) ⇒ Object
30 31 32 |
# File 'lib/durable_flow/test_helper.rb', line 30 def durable_flow_run(run_id) WorkflowRun.find_by!(run_id: run_id.to_s) end |
#durable_flow_run_for(workflow_class) ⇒ Object
34 35 36 37 |
# File 'lib/durable_flow/test_helper.rb', line 34 def durable_flow_run_for(workflow_class) WorkflowRun.where(workflow_class: workflow_class_name(workflow_class)).order(:created_at, :id).last || flunk("Expected a DurableFlow run for #{workflow_class_name(workflow_class)}") end |
#durable_flow_step(workflow_or_run, name) ⇒ Object
43 44 45 46 |
# File 'lib/durable_flow/test_helper.rb', line 43 def durable_flow_step(workflow_or_run, name) run = durable_flow_resolve_run(workflow_or_run) run.workflow_steps.find_by!(name: name.to_s) end |
#durable_flow_timeline_for(workflow_or_run) ⇒ Object
39 40 41 |
# File 'lib/durable_flow/test_helper.rb', line 39 def durable_flow_timeline_for(workflow_or_run) durable_flow_resolve_run(workflow_or_run).timeline end |
#next_workflow_wake_at(workflow_or_run = nil) ⇒ Object
166 167 168 169 170 171 |
# File 'lib/durable_flow/test_helper.rb', line 166 def next_workflow_wake_at(workflow_or_run = nil) scope = WorkflowStep.where(status: "sleeping") scope = scope.where(workflow_run: durable_flow_resolve_run(workflow_or_run)) if workflow_or_run scope.filter_map { |step| parse_workflow_wake_at(step.["wake_at"]) }.min end |
#notify_workflow_event(name, **payload) ⇒ Object
74 75 76 |
# File 'lib/durable_flow/test_helper.rb', line 74 def notify_workflow_event(name, **payload) payload.empty? ? DurableFlow.notify(name) : DurableFlow.notify(name, payload) end |
#perform_durable_flow_jobs(**options, &block) ⇒ Object
48 49 50 51 52 |
# File 'lib/durable_flow/test_helper.rb', line 48 def perform_durable_flow_jobs(**, &block) raise "Include ActiveJob::TestHelper to perform DurableFlow jobs" unless respond_to?(:perform_enqueued_jobs) perform_enqueued_jobs(**, &block) end |
#perform_durable_flow_until_idle(at: Time.current, limit: 100, **options) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/durable_flow/test_helper.rb', line 54 def perform_durable_flow_until_idle(at: Time.current, limit: 100, **) raise "Include ActiveJob::TestHelper to perform DurableFlow jobs" unless respond_to?(:perform_enqueued_jobs) performed = 0 limit.times do break unless durable_flow_performable_job_enqueued?(at: at) before = respond_to?(:performed_jobs) ? performed_jobs.size : 0 perform_durable_flow_jobs(**.merge(at: at)) performed += performed_jobs.size - before if respond_to?(:performed_jobs) end if durable_flow_performable_job_enqueued?(at: at) raise "DurableFlow jobs did not become idle after #{limit} drain attempts" end performed end |
#resume_workflows_for(name, **payload) ⇒ Object
78 79 80 81 |
# File 'lib/durable_flow/test_helper.rb', line 78 def resume_workflows_for(name, **payload) notify_workflow_event(name, **payload) perform_durable_flow_jobs(at: Time.current) end |
#travel_to_next_workflow_wake(workflow_or_run = nil) ⇒ Object
158 159 160 161 162 163 164 |
# File 'lib/durable_flow/test_helper.rb', line 158 def travel_to_next_workflow_wake(workflow_or_run = nil) wake_at = next_workflow_wake_at(workflow_or_run) assert wake_at, "Expected a sleeping DurableFlow step with wake_at metadata" travel_to wake_at wake_at end |