Class: DurableFlow::WorkflowRun

Inherits:
ApplicationRecord show all
Includes:
Live::Broadcastable
Defined in:
lib/durable_flow/models/workflow_run.rb

Constant Summary collapse

TERMINAL_STATUSES =
%w[completed failed].freeze

Instance Method Summary collapse

Instance Method Details

#acquire_execution_lock!(owner:, ttl:) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/durable_flow/models/workflow_run.rb', line 17

def acquire_execution_lock!(owner:, ttl:)
  now = Time.current
  lock_expires_at = now + ttl

  updated = self.class
    .where(id: id)
    .active
    .where("execution_locked_by IS NULL OR execution_lock_expires_at <= ?", now)
    .update_all(
      execution_locked_by: owner,
      execution_locked_at: now,
      execution_lock_expires_at: lock_expires_at,
      updated_at: now,
    )

  reload if updated == 1
  updated == 1
end

#completed?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/durable_flow/models/workflow_run.rb', line 65

def completed?
  status == "completed"
end

#execution_locked?Boolean

Returns:

  • (Boolean)


61
62
63
# File 'lib/durable_flow/models/workflow_run.rb', line 61

def execution_locked?
  execution_locked_by.present? && execution_lock_expires_at.present? && execution_lock_expires_at > Time.current
end

#failed?Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/durable_flow/models/workflow_run.rb', line 69

def failed?
  status == "failed"
end

#live_snapshotObject



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/durable_flow/models/workflow_run.rb', line 81

def live_snapshot
  {
    id: id,
    run_id: run_id,
    job_id: job_id,
    workflow_class: workflow_class,
    status: status,
    queue_name: queue_name,
    priority: priority,
    started_at: started_at,
    interrupted_at: interrupted_at,
    completed_at: completed_at,
    failed_at: failed_at,
    execution_locked: execution_locked?,
    execution_lock_expires_at: execution_lock_expires_at,
    created_at: created_at,
    updated_at: updated_at,
  }
end

#refresh_execution_lock!(owner:, ttl:) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/durable_flow/models/workflow_run.rb', line 47

def refresh_execution_lock!(owner:, ttl:)
  now = Time.current

  updated = self.class
    .where(id: id, execution_locked_by: owner)
    .update_all(
      execution_lock_expires_at: now + ttl,
      updated_at: now,
    )

  reload if updated == 1
  updated == 1
end

#release_execution_lock!(owner:) ⇒ Object



36
37
38
39
40
41
42
43
44
45
# File 'lib/durable_flow/models/workflow_run.rb', line 36

def release_execution_lock!(owner:)
  self.class
    .where(id: id, execution_locked_by: owner)
    .update_all(
      execution_locked_by: nil,
      execution_locked_at: nil,
      execution_lock_expires_at: nil,
      updated_at: Time.current,
    )
end

#terminal?Boolean

Returns:

  • (Boolean)


73
74
75
# File 'lib/durable_flow/models/workflow_run.rb', line 73

def terminal?
  completed? || failed?
end

#timelineObject



77
78
79
# File 'lib/durable_flow/models/workflow_run.rb', line 77

def timeline
  WorkflowTimeline.new(self)
end