Module: ChronoForge::Executor

Includes:
Methods
Defined in:
lib/chrono_forge/executor.rb,
lib/chrono_forge/executor/context.rb,
lib/chrono_forge/executor/methods.rb,
lib/chrono_forge/executor/methods/wait.rb,
lib/chrono_forge/executor/retry_policy.rb,
lib/chrono_forge/executor/lock_strategy.rb,
lib/chrono_forge/executor/methods/branch.rb,
lib/chrono_forge/executor/execution_tracker.rb,
lib/chrono_forge/executor/methods/wait_until.rb,
lib/chrono_forge/executor/methods/continue_if.rb,
lib/chrono_forge/executor/composite_retry_policy.rb,
lib/chrono_forge/executor/methods/durably_repeat.rb,
lib/chrono_forge/executor/methods/merge_branches.rb,
lib/chrono_forge/executor/methods/durably_execute.rb,
lib/chrono_forge/executor/methods/workflow_states.rb

Defined Under Namespace

Modules: Methods Classes: CompositeRetryPolicy, ConcurrentExecutionError, Context, Error, ExecutionFailedError, ExecutionFlowControl, ExecutionTracker, HaltExecutionFlow, InvalidStepName, LockStrategy, LongRunningConcurrentExecutionError, NotExecutableError, NotInBranchError, RetryPolicy, UnknownBranchError, UnmergedBranchError, WaitConditionNotMet, WorkflowNotRetryableError

Constant Summary collapse

STEP_NAME_DELIMITER =

“$” separates the segments of a step name (e.g. “durably_repeat$name$ts”). User-supplied names/methods must not contain it.

"$"
RESERVED_KWARGS =

Keyword args ChronoForge threads through job args internally. Users must not pass these to perform_now/perform_later; the framework injects them via ‘.set(…)` continuations, whose ConfiguredJob proxy bypasses the class-level guard in `prepended` below.

%i[attempt retry_counts retry_workflow].freeze

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Methods::MergeBranches

#merge_branches

Methods included from Methods::Branch

#branch, #spawn, #spawn_each

Methods included from Methods::DurablyRepeat

#durably_repeat

Methods included from Methods::DurablyExecute

#durably_execute

Methods included from Methods::ContinueIf

#continue_if

Methods included from Methods::WaitUntil

#wait_until

Methods included from Methods::Wait

#wait

Class Method Details

.prepended(base) ⇒ Object

Add class methods



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/chrono_forge/executor.rb', line 42

def self.prepended(base)
  # Class-wide default retry policy, inherited by subclasses. Set via the
  # `retry_policy` DSL below; nil means "use the per-site built-in default".
  base.class_attribute :default_retry_policy, instance_accessor: false, default: nil

  class << base
    # Public enqueue contract: exactly one positional (`key`) plus keywords.
    # Reserved internal kwargs (RESERVED_KWARGS) are rejected here; the
    # framework injects them only via `.set(...)` continuations, whose
    # ActiveJob ConfiguredJob proxy bypasses these class-level overrides.
    def perform_now(key, *extra, **kwargs)
      __validate_enqueue!(key, extra, kwargs)
      super(key, **kwargs)
    end

    def perform_later(key, *extra, **kwargs)
      __validate_enqueue!(key, extra, kwargs)
      super(key, **kwargs)
    end

    # Re-run a failed/stalled workflow. Routes through `.set(...)` so the
    # reserved `retry_workflow: true` flag reaches the instance perform
    # without tripping the public guard above.
    def retry_now(key, **kwargs)
      __validate_enqueue!(key, [], kwargs)
      set.perform_now(key, retry_workflow: true, **kwargs)
    end

    def retry_later(key, **kwargs)
      __validate_enqueue!(key, [], kwargs)
      set.perform_later(key, retry_workflow: true, **kwargs)
    end

    # Class-level DSL to set this workflow's default retry policy. Applies to
    # workflow-level retries and to steps without a per-call override.
    # Positional RetryPolicy objects build a composite (per-error budgets);
    # keyword options build a single RetryPolicy. The two forms are mutually
    # exclusive.
    def retry_policy(*policies, **opts)
      if policies.any? && opts.any?
        raise ArgumentError, "retry_policy takes either positional policies or keyword options, not both"
      end

      self.default_retry_policy =
        policies.any? ? RetryPolicy.compose(*policies) : RetryPolicy.new(**opts)
    end

    private

    def __validate_enqueue!(key, extra, kwargs)
      unless key.is_a?(String)
        raise ArgumentError, "Workflow key must be a string as the first argument"
      end
      unless extra.empty?
        raise ArgumentError,
          "ChronoForge workflows accept only `key` positionally; pass " \
          "everything else as keywords (got #{extra.size} extra positional arg(s))"
      end
      reserved = kwargs.keys & RESERVED_KWARGS
      if reserved.any?
        raise ArgumentError,
          "#{reserved.join(", ")} #{reserved.one? ? "is a reserved" : "are reserved"} " \
          "ChronoForge #{reserved.one? ? "keyword" : "keywords"} and cannot be passed to perform_now/perform_later"
      end
    end
  end
end

Instance Method Details

#perform(key, attempt: 0, retry_counts: {}, retry_workflow: false, options: {}, **kwargs) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/chrono_forge/executor.rb', line 110

def perform(key, attempt: 0, retry_counts: {}, retry_workflow: false, options: {}, **kwargs)
  # Safety net: prevent re-running a workflow whose attempts are exhausted
  # (e.g. a stale job left in the queue). The normal exhaustion path fails the
  # workflow from the rescue below before this is ever reached.
  policy = workflow_retry_policy
  if policy.max_attempts && attempt >= policy.max_attempts
    Rails.logger.error { "ChronoForge:#{self.class} max attempts reached for job workflow(#{key})" }
    return
  end

  # Find or create workflow instance
  setup_workflow!(key, options, kwargs)

  # Handle retry parameter - unlock and continue execution
  retry_workflow! if retry_workflow

  # Track if we acquired the lock
  lock_acquired = false

  begin
    # Acquire lock with advanced concurrency protection
    @workflow = self.class::LockStrategy.acquire_lock(job_id, workflow, max_duration: max_duration)
    lock_acquired = true

    # Setup context
    setup_context!

    # Execute core job logic
    super(**workflow.kwargs.symbolize_keys)

    # Mark as complete
    complete_workflow!
  rescue ExecutionFailedError
    # The step that raised this already logged the underlying cause (with its
    # step/attempt context); ExecutionFailedError is control flow, not a new
    # error, so re-logging it here would just duplicate the row.
    Rails.logger.error { "ChronoForge:#{self.class}(#{key}) step execution failed" }
    workflow.stalled!
    nil
  rescue HaltExecutionFlow
    # Halt execution
    Rails.logger.debug { "ChronoForge:#{self.class}(#{key}) execution halted" }
    nil
  rescue ConcurrentExecutionError
    # Graceful handling of concurrent execution
    Rails.logger.warn { "ChronoForge:#{self.class}(#{key}) concurrent execution detected" }
    nil
  rescue NotExecutableError
    raise
  rescue => e
    Rails.logger.error { "ChronoForge:#{self.class}(#{key}) workflow execution failed" }
    error_log = self.class::ExecutionTracker.track_error(workflow, e, attempt: attempt)

    # Retry if applicable. `attempt` is a 0-based index, so the count of
    # attempts made so far (including this one) is attempt + 1. For a
    # composite policy the per-error budget lives in `retry_counts` (keyed by
    # the matched policy's budget_key) and rides along the job args, mirroring
    # how `attempt` is threaded — there is no execution log at this level.
    attempts_made = attempt + 1
    backoff = policy.retry_backoff(e, attempts: attempts_made) do |policy_key|
      retry_counts[policy_key] = retry_counts[policy_key].to_i + 1
      retry_counts[policy_key]
    end
    if backoff
      enqueue_continuation(wait: backoff, attempt: attempts_made, retry_counts: retry_counts)
    else
      fail_workflow! error_log
    end
  ensure
    if lock_acquired # Only release lock if we acquired it
      # Release the lock and publish the continuation even if context.save!
      # raises — otherwise a transient save failure would leave the lock held
      # (until it goes stale) AND drop the continuation, stranding the workflow
      # with nothing scheduled to resume it. On a save failure the continuation
      # resumes from the last persisted context, which is exactly crash
      # semantics (durable steps replay).
      begin
        context.save!
      ensure
        self.class::LockStrategy.release_lock(job_id, workflow)
        # Publish the continuation only now — after the lock is released — so a
        # zero-delay, same-key continuation can't lose the acquire race against
        # this still-locked job. If release_lock raised (this job overran and
        # lost the lock), we never reach here and another job owns continuation.
        flush_continuation!
      end
    end
  end
end