Class: DurableFlow::WorkflowRun
- Inherits:
-
ApplicationRecord
- Object
- ActiveRecord::Base
- ApplicationRecord
- DurableFlow::WorkflowRun
- Includes:
- Live::Broadcastable
- Defined in:
- lib/durable_flow/models/workflow_run.rb
Constant Summary collapse
- TERMINAL_STATUSES =
%w[completed failed].freeze
Instance Method Summary collapse
- #acquire_execution_lock!(owner:, ttl:) ⇒ Object
- #completed? ⇒ Boolean
- #execution_locked? ⇒ Boolean
- #failed? ⇒ Boolean
- #live_snapshot ⇒ Object
- #refresh_execution_lock!(owner:, ttl:) ⇒ Object
- #release_execution_lock!(owner:) ⇒ Object
- #terminal? ⇒ Boolean
- #timeline ⇒ Object
Instance Method Details
#acquire_execution_lock!(owner:, ttl:) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/durable_flow/models/workflow_run.rb', line 17 def acquire_execution_lock!(owner:, ttl:) now = Time.current lock_expires_at = now + ttl updated = self.class .where(id: id) .active .where("execution_locked_by IS NULL OR execution_lock_expires_at <= ?", now) .update_all( execution_locked_by: owner, execution_locked_at: now, execution_lock_expires_at: lock_expires_at, updated_at: now, ) reload if updated == 1 updated == 1 end |
#completed? ⇒ Boolean
65 66 67 |
# File 'lib/durable_flow/models/workflow_run.rb', line 65 def completed? status == "completed" end |
#execution_locked? ⇒ Boolean
61 62 63 |
# File 'lib/durable_flow/models/workflow_run.rb', line 61 def execution_locked? execution_locked_by.present? && execution_lock_expires_at.present? && execution_lock_expires_at > Time.current end |
#failed? ⇒ Boolean
69 70 71 |
# File 'lib/durable_flow/models/workflow_run.rb', line 69 def failed? status == "failed" end |
#live_snapshot ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/durable_flow/models/workflow_run.rb', line 81 def live_snapshot { id: id, run_id: run_id, job_id: job_id, workflow_class: workflow_class, status: status, queue_name: queue_name, priority: priority, started_at: started_at, interrupted_at: interrupted_at, completed_at: completed_at, failed_at: failed_at, execution_locked: execution_locked?, execution_lock_expires_at: execution_lock_expires_at, created_at: created_at, updated_at: updated_at, } end |
#refresh_execution_lock!(owner:, ttl:) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/durable_flow/models/workflow_run.rb', line 47 def refresh_execution_lock!(owner:, ttl:) now = Time.current updated = self.class .where(id: id, execution_locked_by: owner) .update_all( execution_lock_expires_at: now + ttl, updated_at: now, ) reload if updated == 1 updated == 1 end |
#release_execution_lock!(owner:) ⇒ Object
36 37 38 39 40 41 42 43 44 45 |
# File 'lib/durable_flow/models/workflow_run.rb', line 36 def release_execution_lock!(owner:) self.class .where(id: id, execution_locked_by: owner) .update_all( execution_locked_by: nil, execution_locked_at: nil, execution_lock_expires_at: nil, updated_at: Time.current, ) end |
#terminal? ⇒ Boolean
73 74 75 |
# File 'lib/durable_flow/models/workflow_run.rb', line 73 def terminal? completed? || failed? end |
#timeline ⇒ Object
77 78 79 |
# File 'lib/durable_flow/models/workflow_run.rb', line 77 def timeline WorkflowTimeline.new(self) end |