Module: ChronoForge::Executor
- Includes:
- Methods
- Defined in:
- lib/chrono_forge/executor.rb,
lib/chrono_forge/executor/context.rb,
lib/chrono_forge/executor/methods.rb,
lib/chrono_forge/executor/methods/wait.rb,
lib/chrono_forge/executor/retry_policy.rb,
lib/chrono_forge/executor/lock_strategy.rb,
lib/chrono_forge/executor/methods/branch.rb,
lib/chrono_forge/executor/execution_tracker.rb,
lib/chrono_forge/executor/methods/wait_until.rb,
lib/chrono_forge/executor/methods/continue_if.rb,
lib/chrono_forge/executor/composite_retry_policy.rb,
lib/chrono_forge/executor/methods/durably_repeat.rb,
lib/chrono_forge/executor/methods/merge_branches.rb,
lib/chrono_forge/executor/methods/durably_execute.rb,
lib/chrono_forge/executor/methods/workflow_states.rb
Defined Under Namespace
Modules: Methods Classes: CompositeRetryPolicy, ConcurrentExecutionError, Context, Error, ExecutionFailedError, ExecutionFlowControl, ExecutionTracker, HaltExecutionFlow, InvalidStepName, LockStrategy, LongRunningConcurrentExecutionError, NotExecutableError, NotInBranchError, RetryPolicy, UnknownBranchError, UnmergedBranchError, WaitConditionNotMet, WorkflowNotRetryableError
Constant Summary collapse
- STEP_NAME_DELIMITER =
“$” separates the segments of a step name (e.g. “durably_repeat$name$ts”). User-supplied names/methods must not contain it.
"$"- RESERVED_KWARGS =
Keyword args ChronoForge threads through job args internally. Users must not pass these to perform_now/perform_later; the framework injects them via ‘.set(…)` continuations, whose ConfiguredJob proxy bypasses the class-level guard in `prepended` below.
%i[attempt retry_counts retry_workflow].freeze
Class Method Summary collapse
-
.prepended(base) ⇒ Object
Add class methods.
Instance Method Summary collapse
Methods included from Methods::MergeBranches
Methods included from Methods::Branch
Methods included from Methods::DurablyRepeat
Methods included from Methods::DurablyExecute
Methods included from Methods::ContinueIf
Methods included from Methods::WaitUntil
Methods included from Methods::Wait
Class Method Details
.prepended(base) ⇒ Object
Add class methods
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/chrono_forge/executor.rb', line 42 def self.prepended(base) # Class-wide default retry policy, inherited by subclasses. Set via the # `retry_policy` DSL below; nil means "use the per-site built-in default". base.class_attribute :default_retry_policy, instance_accessor: false, default: nil class << base # Public enqueue contract: exactly one positional (`key`) plus keywords. # Reserved internal kwargs (RESERVED_KWARGS) are rejected here; the # framework injects them only via `.set(...)` continuations, whose # ActiveJob ConfiguredJob proxy bypasses these class-level overrides. def perform_now(key, *extra, **kwargs) __validate_enqueue!(key, extra, kwargs) super(key, **kwargs) end def perform_later(key, *extra, **kwargs) __validate_enqueue!(key, extra, kwargs) super(key, **kwargs) end # Re-run a failed/stalled workflow. Routes through `.set(...)` so the # reserved `retry_workflow: true` flag reaches the instance perform # without tripping the public guard above. def retry_now(key, **kwargs) __validate_enqueue!(key, [], kwargs) set.perform_now(key, retry_workflow: true, **kwargs) end def retry_later(key, **kwargs) __validate_enqueue!(key, [], kwargs) set.perform_later(key, retry_workflow: true, **kwargs) end # Class-level DSL to set this workflow's default retry policy. Applies to # workflow-level retries and to steps without a per-call override. # Positional RetryPolicy objects build a composite (per-error budgets); # keyword options build a single RetryPolicy. The two forms are mutually # exclusive. def retry_policy(*policies, **opts) if policies.any? && opts.any? raise ArgumentError, "retry_policy takes either positional policies or keyword options, not both" end self.default_retry_policy = policies.any? ? RetryPolicy.compose(*policies) : RetryPolicy.new(**opts) end private def __validate_enqueue!(key, extra, kwargs) unless key.is_a?(String) raise ArgumentError, "Workflow key must be a string as the first argument" end unless extra.empty? raise ArgumentError, "ChronoForge workflows accept only `key` positionally; pass " \ "everything else as keywords (got #{extra.size} extra positional arg(s))" end reserved = kwargs.keys & RESERVED_KWARGS if reserved.any? raise ArgumentError, "#{reserved.join(", ")} #{reserved.one? ? "is a reserved" : "are reserved"} " \ "ChronoForge #{reserved.one? ? "keyword" : "keywords"} and cannot be passed to perform_now/perform_later" end end end end |
Instance Method Details
#perform(key, attempt: 0, retry_counts: {}, retry_workflow: false, options: {}, **kwargs) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/chrono_forge/executor.rb', line 110 def perform(key, attempt: 0, retry_counts: {}, retry_workflow: false, options: {}, **kwargs) # Safety net: prevent re-running a workflow whose attempts are exhausted # (e.g. a stale job left in the queue). The normal exhaustion path fails the # workflow from the rescue below before this is ever reached. policy = workflow_retry_policy if policy.max_attempts && attempt >= policy.max_attempts Rails.logger.error { "ChronoForge:#{self.class} max attempts reached for job workflow(#{key})" } return end # Find or create workflow instance setup_workflow!(key, , kwargs) # Handle retry parameter - unlock and continue execution retry_workflow! if retry_workflow # Track if we acquired the lock lock_acquired = false begin # Acquire lock with advanced concurrency protection @workflow = self.class::LockStrategy.acquire_lock(job_id, workflow, max_duration: max_duration) lock_acquired = true # Setup context setup_context! # Execute core job logic super(**workflow.kwargs.symbolize_keys) # Mark as complete complete_workflow! rescue ExecutionFailedError # The step that raised this already logged the underlying cause (with its # step/attempt context); ExecutionFailedError is control flow, not a new # error, so re-logging it here would just duplicate the row. Rails.logger.error { "ChronoForge:#{self.class}(#{key}) step execution failed" } workflow.stalled! nil rescue HaltExecutionFlow # Halt execution Rails.logger.debug { "ChronoForge:#{self.class}(#{key}) execution halted" } nil rescue ConcurrentExecutionError # Graceful handling of concurrent execution Rails.logger.warn { "ChronoForge:#{self.class}(#{key}) concurrent execution detected" } nil rescue NotExecutableError raise rescue => e Rails.logger.error { "ChronoForge:#{self.class}(#{key}) workflow execution failed" } error_log = self.class::ExecutionTracker.track_error(workflow, e, attempt: attempt) # Retry if applicable. `attempt` is a 0-based index, so the count of # attempts made so far (including this one) is attempt + 1. For a # composite policy the per-error budget lives in `retry_counts` (keyed by # the matched policy's budget_key) and rides along the job args, mirroring # how `attempt` is threaded — there is no execution log at this level. attempts_made = attempt + 1 backoff = policy.retry_backoff(e, attempts: attempts_made) do |policy_key| retry_counts[policy_key] = retry_counts[policy_key].to_i + 1 retry_counts[policy_key] end if backoff enqueue_continuation(wait: backoff, attempt: attempts_made, retry_counts: retry_counts) else fail_workflow! error_log end ensure if lock_acquired # Only release lock if we acquired it # Release the lock and publish the continuation even if context.save! # raises — otherwise a transient save failure would leave the lock held # (until it goes stale) AND drop the continuation, stranding the workflow # with nothing scheduled to resume it. On a save failure the continuation # resumes from the last persisted context, which is exactly crash # semantics (durable steps replay). begin context.save! ensure self.class::LockStrategy.release_lock(job_id, workflow) # Publish the continuation only now — after the lock is released — so a # zero-delay, same-key continuation can't lose the acquire race against # this still-locked job. If release_lock raised (this job overran and # lost the lock), we never reach here and another job owns continuation. flush_continuation! end end end end |