Module: Smith::Workflow::Durability::ClassMethods
- Defined in:
- lib/smith/workflow/durability.rb
Instance Method Summary collapse
- #heartbeat_age(persistence_key:, adapter: Smith.persistence_adapter) ⇒ Object
-
#persisted_state_exists?(key: nil, context: {}, adapter: Smith.persistence_adapter) ⇒ Boolean
Peek without instantiating: returns true if persisted state exists for the resolved key, false otherwise.
-
#restorable_billing_state?(key: nil, context: {}, adapter: Smith.persistence_adapter) ⇒ Boolean
Stricter peek: returns true only when persisted state contains billable work that needs to be preserved (at least one ‘usage_entries` entry).
- #restore(key, adapter: Smith.persistence_adapter) ⇒ Object
- #restore_or_initialize(key: nil, context: {}, adapter: Smith.persistence_adapter, **kwargs) ⇒ Object
- #run_persisted!(key: nil, context: {}, adapter: Smith.persistence_adapter, clear: :done, on_step: nil, **kwargs) ⇒ Object
- #stuck_for?(persistence_key:, threshold:, since: nil, adapter: Smith.persistence_adapter) ⇒ Boolean
Instance Method Details
#heartbeat_age(persistence_key:, adapter: Smith.persistence_adapter) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/smith/workflow/durability.rb', line 114 def heartbeat_age(persistence_key:, adapter: Smith.persistence_adapter) raise WorkflowError, "persistence_adapter is not configured" if adapter.nil? if Smith::PersistenceAdapters.supports?(adapter, :last_heartbeat) hb = adapter.last_heartbeat(persistence_key) return [Time.now.utc - hb.to_time.utc, 0.0].max.to_f if hb end payload = adapter.fetch(persistence_key) return nil if payload.nil? age_from_payload_updated_at(payload, Time.now.utc) end |
#persisted_state_exists?(key: nil, context: {}, adapter: Smith.persistence_adapter) ⇒ Boolean
Peek without instantiating: returns true if persisted state exists for the resolved key, false otherwise. Reuses the existing private helpers (‘resolved_persistence_key`, `fetch_persisted_payload`) so it doesn’t expand the adapter contract — ‘Smith::PersistenceAdapter` requires only `store/fetch/delete`, and this peek piggybacks on `fetch`. Custom adapters work without changes.
Hadithi uses this to skip the credits guard at execution time when persisted state already exists for a workflow key (a prior attempt’s billable work is durable in Redis, OR an in-flight workflow is being resumed — either way, no NEW credit authorization is needed).
40 41 42 43 |
# File 'lib/smith/workflow/durability.rb', line 40 def persisted_state_exists?(key: nil, context: {}, adapter: Smith.persistence_adapter) resolved_key = resolved_persistence_key(key:, context:) !fetch_persisted_payload(resolved_key, adapter:).nil? end |
#restorable_billing_state?(key: nil, context: {}, adapter: Smith.persistence_adapter) ⇒ Boolean
Stricter peek: returns true only when persisted state contains billable work that needs to be preserved (at least one ‘usage_entries` entry).
‘persisted_state_exists?` answers “is there any state?” — but that includes the bare initial-state record Smith writes at the top of `run_persisted!` BEFORE the first `advance!`. A worker that dies between that initial `persist!` and the first model call leaves a Redis key with no billable work. If the credits guard’s bypass keys on ‘persisted_state_exists?` alone, a zero-balance user’s retry on that abandoned init state silently runs the first model call (the guard is skipped because state exists, but the state has nothing to bill — it’s just the workflow’s starting state).
‘restorable_billing_state?` returns true only when there’s actual ‘usage_entries` to bill on idempotent replay. Terminal state with zero entries is also `false` because there’s nothing to preserve — ‘run_persisted!` is a no-op on terminal anyway, so guard outcome doesn’t matter for correctness in that case.
This calls ‘restore` (full deserialize) rather than just `fetch`, so it’s heavier than ‘persisted_state_exists?`. Use this when you specifically want the billing-aware semantics; use `persisted_state_exists?` when you only need a key- presence check.
72 73 74 75 76 77 78 79 80 |
# File 'lib/smith/workflow/durability.rb', line 72 def restorable_billing_state?(key: nil, context: {}, adapter: Smith.persistence_adapter) resolved_key = resolved_persistence_key(key:, context:) payload = fetch_persisted_payload(resolved_key, adapter:) return false if payload.nil? workflow = from_state(JSON.parse(payload)) entries = workflow.instance_variable_get(:@usage_entries) || [] entries.any? end |
#restore(key, adapter: Smith.persistence_adapter) ⇒ Object
13 14 15 16 17 18 19 20 21 |
# File 'lib/smith/workflow/durability.rb', line 13 def restore(key, adapter: Smith.persistence_adapter) resolved_key = explicit_persistence_key!(key) payload = fetch_persisted_payload(resolved_key, adapter:) return nil unless payload from_state(JSON.parse(payload)).tap do |workflow| workflow.instance_variable_set(:@persistence_key, resolved_key) end end |
#restore_or_initialize(key: nil, context: {}, adapter: Smith.persistence_adapter, **kwargs) ⇒ Object
23 24 25 |
# File 'lib/smith/workflow/durability.rb', line 23 def restore_or_initialize(key: nil, context: {}, adapter: Smith.persistence_adapter, **kwargs) restore(resolved_persistence_key(key:, context:), adapter:) || new(context:, **kwargs) end |
#run_persisted!(key: nil, context: {}, adapter: Smith.persistence_adapter, clear: :done, on_step: nil, **kwargs) ⇒ Object
128 129 130 131 132 133 134 135 136 |
# File 'lib/smith/workflow/durability.rb', line 128 def run_persisted!(key: nil, context: {}, adapter: Smith.persistence_adapter, clear: :done, on_step: nil, **kwargs) clear_policy = normalize_clear_policy(clear) resolved_key = resolved_persistence_key(key:, context:) workflow = restore(resolved_key, adapter:) || new(context:, **kwargs) result = workflow.run_persisted!(resolved_key, adapter:, on_step:) workflow.clear_persisted!(resolved_key, adapter:) if clear_persisted_after_run?(clear_policy, workflow) result end |
#stuck_for?(persistence_key:, threshold:, since: nil, adapter: Smith.persistence_adapter) ⇒ Boolean
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/smith/workflow/durability.rb', line 82 def stuck_for?(persistence_key:, threshold:, since: nil, adapter: Smith.persistence_adapter) raise WorkflowError, "persistence_adapter is not configured" if adapter.nil? raise ArgumentError, "persistence_key must not be blank" if persistence_key.nil? || (persistence_key.respond_to?(:strip) && persistence_key.strip.empty?) raise ArgumentError, "threshold must respond to :to_i" unless threshold.respond_to?(:to_i) if since && !since.respond_to?(:to_time) raise ArgumentError, "since must respond to :to_time or be nil" end threshold_seconds = threshold.to_i now = Time.now.utc if Smith::PersistenceAdapters.supports?(adapter, :last_heartbeat) hb = adapter.last_heartbeat(persistence_key) if hb age = (now - hb.to_time.utc).to_f return false if age < threshold_seconds end end payload = adapter.fetch(persistence_key) return stuck_for_no_payload?(since, now, threshold_seconds) if payload.nil? if !Smith::PersistenceAdapters.supports?(adapter, :last_heartbeat) Smith::PersistenceAdapters.warn_missing_heartbeat(adapter) fallback_age = age_from_payload_updated_at(payload, now) return false if fallback_age.nil? || fallback_age < threshold_seconds end !terminal_in_payload?(payload) end |