Module: Smith::Workflow::Durability::ClassMethods

Defined in:
lib/smith/workflow/durability.rb

Instance Method Summary collapse

Instance Method Details

#heartbeat_age(persistence_key:, adapter: Smith.persistence_adapter) ⇒ Object

Raises:



114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/smith/workflow/durability.rb', line 114

def heartbeat_age(persistence_key:, adapter: Smith.persistence_adapter)
  raise WorkflowError, "persistence_adapter is not configured" if adapter.nil?

  if Smith::PersistenceAdapters.supports?(adapter, :last_heartbeat)
    hb = adapter.last_heartbeat(persistence_key)
    return [Time.now.utc - hb.to_time.utc, 0.0].max.to_f if hb
  end

  payload = adapter.fetch(persistence_key)
  return nil if payload.nil?

  age_from_payload_updated_at(payload, Time.now.utc)
end

#persisted_state_exists?(key: nil, context: {}, adapter: Smith.persistence_adapter) ⇒ Boolean

Peek without instantiating: returns true if persisted state exists for the resolved key, false otherwise. Reuses the existing private helpers (‘resolved_persistence_key`, `fetch_persisted_payload`) so it doesn’t expand the adapter contract — ‘Smith::PersistenceAdapter` requires only `store/fetch/delete`, and this peek piggybacks on `fetch`. Custom adapters work without changes.

Hadithi uses this to skip the credits guard at execution time when persisted state already exists for a workflow key (a prior attempt’s billable work is durable in Redis, OR an in-flight workflow is being resumed — either way, no NEW credit authorization is needed).

Returns:

  • (Boolean)


40
41
42
43
# File 'lib/smith/workflow/durability.rb', line 40

def persisted_state_exists?(key: nil, context: {}, adapter: Smith.persistence_adapter)
  resolved_key = resolved_persistence_key(key:, context:)
  !fetch_persisted_payload(resolved_key, adapter:).nil?
end

#restorable_billing_state?(key: nil, context: {}, adapter: Smith.persistence_adapter) ⇒ Boolean

Stricter peek: returns true only when persisted state contains billable work that needs to be preserved (at least one ‘usage_entries` entry).

‘persisted_state_exists?` answers “is there any state?” — but that includes the bare initial-state record Smith writes at the top of `run_persisted!` BEFORE the first `advance!`. A worker that dies between that initial `persist!` and the first model call leaves a Redis key with no billable work. If the credits guard’s bypass keys on ‘persisted_state_exists?` alone, a zero-balance user’s retry on that abandoned init state silently runs the first model call (the guard is skipped because state exists, but the state has nothing to bill — it’s just the workflow’s starting state).

‘restorable_billing_state?` returns true only when there’s actual ‘usage_entries` to bill on idempotent replay. Terminal state with zero entries is also `false` because there’s nothing to preserve — ‘run_persisted!` is a no-op on terminal anyway, so guard outcome doesn’t matter for correctness in that case.

This calls ‘restore` (full deserialize) rather than just `fetch`, so it’s heavier than ‘persisted_state_exists?`. Use this when you specifically want the billing-aware semantics; use `persisted_state_exists?` when you only need a key- presence check.

Returns:

  • (Boolean)


72
73
74
75
76
77
78
79
80
# File 'lib/smith/workflow/durability.rb', line 72

def restorable_billing_state?(key: nil, context: {}, adapter: Smith.persistence_adapter)
  resolved_key = resolved_persistence_key(key:, context:)
  payload = fetch_persisted_payload(resolved_key, adapter:)
  return false if payload.nil?

  workflow = from_state(JSON.parse(payload))
  entries = workflow.instance_variable_get(:@usage_entries) || []
  entries.any?
end

#restore(key, adapter: Smith.persistence_adapter) ⇒ Object



13
14
15
16
17
18
19
20
21
# File 'lib/smith/workflow/durability.rb', line 13

def restore(key, adapter: Smith.persistence_adapter)
  resolved_key = explicit_persistence_key!(key)
  payload = fetch_persisted_payload(resolved_key, adapter:)
  return nil unless payload

  from_state(JSON.parse(payload)).tap do |workflow|
    workflow.instance_variable_set(:@persistence_key, resolved_key)
  end
end

#restore_or_initialize(key: nil, context: {}, adapter: Smith.persistence_adapter, **kwargs) ⇒ Object



23
24
25
# File 'lib/smith/workflow/durability.rb', line 23

def restore_or_initialize(key: nil, context: {}, adapter: Smith.persistence_adapter, **kwargs)
  restore(resolved_persistence_key(key:, context:), adapter:) || new(context:, **kwargs)
end

#run_persisted!(key: nil, context: {}, adapter: Smith.persistence_adapter, clear: :done, on_step: nil, **kwargs) ⇒ Object



128
129
130
131
132
133
134
135
136
# File 'lib/smith/workflow/durability.rb', line 128

def run_persisted!(key: nil, context: {}, adapter: Smith.persistence_adapter, clear: :done, on_step: nil, **kwargs)
  clear_policy = normalize_clear_policy(clear)
  resolved_key = resolved_persistence_key(key:, context:)
  workflow = restore(resolved_key, adapter:) || new(context:, **kwargs)
  result = workflow.run_persisted!(resolved_key, adapter:, on_step:)

  workflow.clear_persisted!(resolved_key, adapter:) if clear_persisted_after_run?(clear_policy, workflow)
  result
end

#stuck_for?(persistence_key:, threshold:, since: nil, adapter: Smith.persistence_adapter) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/smith/workflow/durability.rb', line 82

def stuck_for?(persistence_key:, threshold:, since: nil, adapter: Smith.persistence_adapter)
  raise WorkflowError, "persistence_adapter is not configured" if adapter.nil?
  raise ArgumentError, "persistence_key must not be blank" if persistence_key.nil? || (persistence_key.respond_to?(:strip) && persistence_key.strip.empty?)
  raise ArgumentError, "threshold must respond to :to_i" unless threshold.respond_to?(:to_i)

  if since && !since.respond_to?(:to_time)
    raise ArgumentError, "since must respond to :to_time or be nil"
  end

  threshold_seconds = threshold.to_i
  now = Time.now.utc

  if Smith::PersistenceAdapters.supports?(adapter, :last_heartbeat)
    hb = adapter.last_heartbeat(persistence_key)
    if hb
      age = (now - hb.to_time.utc).to_f
      return false if age < threshold_seconds
    end
  end

  payload = adapter.fetch(persistence_key)
  return stuck_for_no_payload?(since, now, threshold_seconds) if payload.nil?

  if !Smith::PersistenceAdapters.supports?(adapter, :last_heartbeat)
    Smith::PersistenceAdapters.warn_missing_heartbeat(adapter)
    fallback_age = age_from_payload_updated_at(payload, now)
    return false if fallback_age.nil? || fallback_age < threshold_seconds
  end

  !terminal_in_payload?(payload)
end