Module: Smith::Workflow::Durability::ClassMethods
- Defined in:
- lib/smith/workflow/durability.rb
Instance Method Summary collapse
- #heartbeat_age(persistence_key:, adapter: Smith.persistence_adapter) ⇒ Object
-
#persisted_state_exists?(key: nil, context: {}, adapter: Smith.persistence_adapter) ⇒ Boolean
Peek without instantiating: returns true if persisted state exists for the resolved key, false otherwise.
-
#restorable_billing_state?(key: nil, context: {}, adapter: Smith.persistence_adapter) ⇒ Boolean
Stricter peek: returns true only when persisted state contains billable work that needs to be preserved (at least one
usage_entriesentry). - #restore(key, adapter: Smith.persistence_adapter) ⇒ Object
- #restore_or_initialize(key: nil, context: {}, adapter: Smith.persistence_adapter, **kwargs) ⇒ Object
- #run_persisted!(key: nil, context: {}, adapter: Smith.persistence_adapter, clear: :done, on_step: nil, **kwargs) ⇒ Object
- #stuck_for?(persistence_key:, threshold:, since: nil, adapter: Smith.persistence_adapter) ⇒ Boolean
Instance Method Details
#heartbeat_age(persistence_key:, adapter: Smith.persistence_adapter) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/smith/workflow/durability.rb', line 114 def heartbeat_age(persistence_key:, adapter: Smith.persistence_adapter) raise WorkflowError, "persistence_adapter is not configured" if adapter.nil? if Smith::PersistenceAdapters.supports?(adapter, :last_heartbeat) hb = adapter.last_heartbeat(persistence_key) return [Time.now.utc - hb.to_time.utc, 0.0].max.to_f if hb end payload = adapter.fetch(persistence_key) return nil if payload.nil? age_from_payload_updated_at(payload, Time.now.utc) end |
#persisted_state_exists?(key: nil, context: {}, adapter: Smith.persistence_adapter) ⇒ Boolean
Peek without instantiating: returns true if persisted state
exists for the resolved key, false otherwise. Reuses the
existing private helpers (resolved_persistence_key,
fetch_persisted_payload) so it doesn't expand the adapter
contract — Smith::PersistenceAdapter requires only
store/fetch/delete, and this peek piggybacks on fetch.
Custom adapters work without changes.
Hadithi uses this to skip the credits guard at execution time when persisted state already exists for a workflow key (a prior attempt's billable work is durable in Redis, OR an in-flight workflow is being resumed — either way, no NEW credit authorization is needed).
40 41 42 43 |
# File 'lib/smith/workflow/durability.rb', line 40 def persisted_state_exists?(key: nil, context: {}, adapter: Smith.persistence_adapter) resolved_key = resolved_persistence_key(key:, context:) !fetch_persisted_payload(resolved_key, adapter:).nil? end |
#restorable_billing_state?(key: nil, context: {}, adapter: Smith.persistence_adapter) ⇒ Boolean
Stricter peek: returns true only when persisted state contains
billable work that needs to be preserved (at least one
usage_entries entry).
persisted_state_exists? answers "is there any state?" — but
that includes the bare initial-state record Smith writes at
the top of run_persisted! BEFORE the first advance!. A
worker that dies between that initial persist! and the
first model call leaves a Redis key with no billable work.
If the credits guard's bypass keys on persisted_state_exists?
alone, a zero-balance user's retry on that abandoned init
state silently runs the first model call (the guard is
skipped because state exists, but the state has nothing to
bill — it's just the workflow's starting state).
restorable_billing_state? returns true only when there's
actual usage_entries to bill on idempotent replay. Terminal
state with zero entries is also false because there's
nothing to preserve — run_persisted! is a no-op on
terminal anyway, so guard outcome doesn't matter for
correctness in that case.
This calls restore (full deserialize) rather than just
fetch, so it's heavier than persisted_state_exists?. Use
this when you specifically want the billing-aware semantics;
use persisted_state_exists? when you only need a key-
presence check.
72 73 74 75 76 77 78 79 80 |
# File 'lib/smith/workflow/durability.rb', line 72 def restorable_billing_state?(key: nil, context: {}, adapter: Smith.persistence_adapter) resolved_key = resolved_persistence_key(key:, context:) payload = fetch_persisted_payload(resolved_key, adapter:) return false if payload.nil? workflow = from_state(JSON.parse(payload)) entries = workflow.instance_variable_get(:@usage_entries) || [] entries.any? end |
#restore(key, adapter: Smith.persistence_adapter) ⇒ Object
13 14 15 16 17 18 19 20 21 |
# File 'lib/smith/workflow/durability.rb', line 13 def restore(key, adapter: Smith.persistence_adapter) resolved_key = explicit_persistence_key!(key) payload = fetch_persisted_payload(resolved_key, adapter:) return nil unless payload from_state(JSON.parse(payload)).tap do |workflow| workflow.instance_variable_set(:@persistence_key, resolved_key) end end |
#restore_or_initialize(key: nil, context: {}, adapter: Smith.persistence_adapter, **kwargs) ⇒ Object
23 24 25 |
# File 'lib/smith/workflow/durability.rb', line 23 def restore_or_initialize(key: nil, context: {}, adapter: Smith.persistence_adapter, **kwargs) restore(resolved_persistence_key(key:, context:), adapter:) || new(context:, **kwargs) end |
#run_persisted!(key: nil, context: {}, adapter: Smith.persistence_adapter, clear: :done, on_step: nil, **kwargs) ⇒ Object
128 129 130 131 132 133 134 135 136 |
# File 'lib/smith/workflow/durability.rb', line 128 def run_persisted!(key: nil, context: {}, adapter: Smith.persistence_adapter, clear: :done, on_step: nil, **kwargs) clear_policy = normalize_clear_policy(clear) resolved_key = resolved_persistence_key(key:, context:) workflow = restore(resolved_key, adapter:) || new(context:, **kwargs) result = workflow.run_persisted!(resolved_key, adapter:, on_step:) workflow.clear_persisted!(resolved_key, adapter:) if clear_persisted_after_run?(clear_policy, workflow) result end |
#stuck_for?(persistence_key:, threshold:, since: nil, adapter: Smith.persistence_adapter) ⇒ Boolean
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/smith/workflow/durability.rb', line 82 def stuck_for?(persistence_key:, threshold:, since: nil, adapter: Smith.persistence_adapter) raise WorkflowError, "persistence_adapter is not configured" if adapter.nil? raise ArgumentError, "persistence_key must not be blank" if persistence_key.nil? || (persistence_key.respond_to?(:strip) && persistence_key.strip.empty?) raise ArgumentError, "threshold must respond to :to_i" unless threshold.respond_to?(:to_i) if since && !since.respond_to?(:to_time) raise ArgumentError, "since must respond to :to_time or be nil" end threshold_seconds = threshold.to_i now = Time.now.utc if Smith::PersistenceAdapters.supports?(adapter, :last_heartbeat) hb = adapter.last_heartbeat(persistence_key) if hb age = (now - hb.to_time.utc).to_f return false if age < threshold_seconds end end payload = adapter.fetch(persistence_key) return stuck_for_no_payload?(since, now, threshold_seconds) if payload.nil? if !Smith::PersistenceAdapters.supports?(adapter, :last_heartbeat) Smith::PersistenceAdapters.warn_missing_heartbeat(adapter) fallback_age = age_from_payload_updated_at(payload, now) return false if fallback_age.nil? || fallback_age < threshold_seconds end !terminal_in_payload?(payload) end |