Module: DurableFlow::TestHelper

Extended by:
ActiveSupport::Concern
Defined in:
lib/durable_flow/test_helper.rb

Instance Method Summary collapse

Instance Method Details

#assert_durable_flow_change(changes, type, **payload) ⇒ Object



152
153
154
155
156
157
158
159
# File 'lib/durable_flow/test_helper.rb', line 152

def assert_durable_flow_change(changes, type, **payload)
  change = changes.find do |candidate|
    candidate.type == type && payload.all? { |key, value| durable_flow_change_value(candidate, key) == value }
  end

  assert change, "Expected DurableFlow change #{type.inspect} with #{payload.inspect}, got #{changes.map(&:payload).inspect}"
  change
end

#assert_step_attempts(workflow_or_run, name, expected) ⇒ Object



110
111
112
113
114
115
# File 'lib/durable_flow/test_helper.rb', line 110

def assert_step_attempts(workflow_or_run, name, expected)
  step = durable_flow_step(workflow_or_run, name)

  assert_equal expected, step.attempts
  step
end

#assert_step_log(workflow_or_run, step_name, level: nil, message: nil, data: nil) ⇒ Object



122
123
124
125
# File 'lib/durable_flow/test_helper.rb', line 122

def assert_step_log(workflow_or_run, step_name, level: nil, message: nil, data: nil)
  step = durable_flow_step(workflow_or_run, step_name)
  (step.workflow_logs.ordered, level: level, message: message, data: data)
end

#assert_step_result(workflow_or_run, name, expected) ⇒ Object



103
104
105
106
107
108
# File 'lib/durable_flow/test_helper.rb', line 103

def assert_step_result(workflow_or_run, name, expected)
  step = assert_step_succeeded(workflow_or_run, name)

  assert_equal expected, step.result_value
  step
end

#assert_step_succeeded(workflow_or_run, name) ⇒ Object



96
97
98
99
100
101
# File 'lib/durable_flow/test_helper.rb', line 96

def assert_step_succeeded(workflow_or_run, name)
  step = durable_flow_step(workflow_or_run, name)

  assert_equal "succeeded", step.status
  step
end

#assert_workflow_completed(workflow_or_run) ⇒ Object



63
64
65
# File 'lib/durable_flow/test_helper.rb', line 63

def assert_workflow_completed(workflow_or_run)
  assert_workflow_status(workflow_or_run, "completed")
end

#assert_workflow_failed(workflow_or_run, error: nil) ⇒ Object



67
68
69
70
71
72
73
74
75
76
# File 'lib/durable_flow/test_helper.rb', line 67

def assert_workflow_failed(workflow_or_run, error: nil)
  run = assert_workflow_status(workflow_or_run, "failed")

  if error
    expected_class = error.respond_to?(:name) ? error.name : error.to_s
    assert_equal expected_class, run.last_error&.fetch("class", nil)
  end

  run
end

#assert_workflow_log(workflow_or_run, level: nil, message: nil, data: nil) ⇒ Object



117
118
119
120
# File 'lib/durable_flow/test_helper.rb', line 117

def assert_workflow_log(workflow_or_run, level: nil, message: nil, data: nil)
  run = durable_flow_resolve_run(workflow_or_run)
  (run.workflow_logs.ordered, level: level, message: message, data: data)
end

#assert_workflow_sleeping(workflow_or_run, step: nil) ⇒ Object



78
79
80
81
82
83
84
85
# File 'lib/durable_flow/test_helper.rb', line 78

def assert_workflow_sleeping(workflow_or_run, step: nil)
  run = assert_workflow_status(workflow_or_run, "sleeping")
  sleep_step = step ? durable_flow_step(run, step) : run.workflow_steps.find_by!(status: "sleeping")

  assert_equal "sleeping", sleep_step.status
  assert sleep_step.["wake_at"].present?, "Expected sleep step #{sleep_step.name.inspect} to store wake_at metadata"
  sleep_step
end

#assert_workflow_waiting_for(workflow_or_run, event_name, match: nil) ⇒ Object



87
88
89
90
91
92
93
94
# File 'lib/durable_flow/test_helper.rb', line 87

def assert_workflow_waiting_for(workflow_or_run, event_name, match: nil)
  run = assert_workflow_status(workflow_or_run, "waiting")
  wait = run.workflow_waits.find_by!(event_name: event_name.to_s)

  assert_equal "pending", wait.status
  assert_equal match, wait.match_value if match
  wait
end

#capture_durable_flow_changesObject



142
143
144
145
146
147
148
149
150
# File 'lib/durable_flow/test_helper.rb', line 142

def capture_durable_flow_changes
  changes = []
  subscriber = DurableFlow.on_change { |change| changes << change }

  yield changes
  changes
ensure
  DurableFlow.unsubscribe_from_changes(subscriber) if subscriber
end

#clear_durable_flow!(clear_jobs: true, reset_live: true) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
# File 'lib/durable_flow/test_helper.rb', line 18

def clear_durable_flow!(clear_jobs: true, reset_live: true)
  DurableFlow.reset_live_broadcasters! if reset_live
  send(:clear_enqueued_jobs) if clear_jobs && respond_to?(:clear_enqueued_jobs, true)
  send(:clear_performed_jobs) if clear_jobs && respond_to?(:clear_performed_jobs, true)

  WorkflowWait.delete_all
  WorkflowEvent.delete_all
  WorkflowLog.delete_all
  WorkflowStep.delete_all
  WorkflowRun.delete_all
end

#durable_flow_run(run_id) ⇒ Object



30
31
32
# File 'lib/durable_flow/test_helper.rb', line 30

def durable_flow_run(run_id)
  WorkflowRun.find_by!(run_id: run_id.to_s)
end

#durable_flow_run_for(workflow_class) ⇒ Object



34
35
36
37
# File 'lib/durable_flow/test_helper.rb', line 34

def durable_flow_run_for(workflow_class)
  WorkflowRun.where(workflow_class: workflow_class_name(workflow_class)).order(:created_at, :id).last ||
    flunk("Expected a DurableFlow run for #{workflow_class_name(workflow_class)}")
end

#durable_flow_step(workflow_or_run, name) ⇒ Object



43
44
45
46
# File 'lib/durable_flow/test_helper.rb', line 43

def durable_flow_step(workflow_or_run, name)
  run = durable_flow_resolve_run(workflow_or_run)
  run.workflow_steps.find_by!(name: name.to_s)
end

#durable_flow_timeline_for(workflow_or_run) ⇒ Object



39
40
41
# File 'lib/durable_flow/test_helper.rb', line 39

def durable_flow_timeline_for(workflow_or_run)
  durable_flow_resolve_run(workflow_or_run).timeline
end

#next_workflow_wake_at(workflow_or_run = nil) ⇒ Object



135
136
137
138
139
140
# File 'lib/durable_flow/test_helper.rb', line 135

def next_workflow_wake_at(workflow_or_run = nil)
  scope = WorkflowStep.where(status: "sleeping")
  scope = scope.where(workflow_run: durable_flow_resolve_run(workflow_or_run)) if workflow_or_run

  scope.filter_map { |step| parse_workflow_wake_at(step.["wake_at"]) }.min
end

#notify_workflow_event(name, **payload) ⇒ Object



54
55
56
# File 'lib/durable_flow/test_helper.rb', line 54

def notify_workflow_event(name, **payload)
  payload.empty? ? DurableFlow.notify(name) : DurableFlow.notify(name, payload)
end

#perform_durable_flow_jobs(**options, &block) ⇒ Object



48
49
50
51
52
# File 'lib/durable_flow/test_helper.rb', line 48

def perform_durable_flow_jobs(**options, &block)
  raise "Include ActiveJob::TestHelper to perform DurableFlow jobs" unless respond_to?(:perform_enqueued_jobs)

  perform_enqueued_jobs(**options, &block)
end

#resume_workflows_for(name, **payload) ⇒ Object



58
59
60
61
# File 'lib/durable_flow/test_helper.rb', line 58

def resume_workflows_for(name, **payload)
  notify_workflow_event(name, **payload)
  perform_durable_flow_jobs(at: Time.current)
end

#travel_to_next_workflow_wake(workflow_or_run = nil) ⇒ Object



127
128
129
130
131
132
133
# File 'lib/durable_flow/test_helper.rb', line 127

def travel_to_next_workflow_wake(workflow_or_run = nil)
  wake_at = next_workflow_wake_at(workflow_or_run)

  assert wake_at, "Expected a sleeping DurableFlow step with wake_at metadata"
  travel_to wake_at
  wake_at
end