Module: ChronoForge::Executor::Methods::Branch
- Included in:
- ChronoForge::Executor::Methods
- Defined in:
- lib/chrono_forge/executor/methods/branch.rb
Instance Method Summary collapse
-
#branch(name, automerge: false) ⇒ Object
Opens a named branch — a durable fan-out step.
-
#spawn(name, workflow_class, **kwargs) ⇒ Object
Dispatch a single child into the current branch.
-
#spawn_each(name, source, of: 1000) ⇒ Object
Dispatch one child per item of ‘source`, streamed.
Instance Method Details
#branch(name, automerge: false) ⇒ Object
Opens a named branch — a durable fan-out step. Spawns inside the block eagerly create + enqueue child workflows; the branch SEALS when the block closes. Returns without waiting (branches are concurrent; the join is a separate merge_branches / automerge).
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/chrono_forge/executor/methods/branch.rb', line 9 def branch(name, automerge: false) raise ArgumentError, "branch requires a block" unless block_given? raise ArgumentError, "branch blocks cannot be nested" if @current_branch validate_step_name_segment!(name) step_name = "branch$#{name}" log = find_or_create_execution_log!(step_name) { |l| l.started_at = Time.current } # The sealed branch log may be a readonly, id-less cache stand-in; fetch # the real id so the registry/merge can scope children to it. log_id = log.id || ExecutionLog.where(workflow: @workflow, step_name: step_name).pick(:id) (@open_branches ||= {})[name.to_s] = {automerge: automerge, log_id: log_id} # ---- THE single most important correctness/performance property ---- # A SEALED branch skips its block ENTIRELY. The expensive source # enumeration in spawn_each never re-runs after sealing. Do not move # dispatch out from behind this guard. unless log.completed? @current_branch = {name: name.to_s, log: log} begin yield ensure @current_branch = nil end log.update!(state: :completed, completed_at: Time.current) end # automerge joins the branch inline, the moment its block closes (eager # dispatch + immediate await). Deferred/concurrent joins use an explicit # merge_branches instead. Runs on every pass so replay re-checks via the # merge$<name> log's own idempotency; the inline merge removes the branch # from @open_branches on completion, so the completion gate won't see it. merge_branches(name) if automerge name end |
#spawn(name, workflow_class, **kwargs) ⇒ Object
Dispatch a single child into the current branch.
47 48 49 50 51 52 53 |
# File 'lib/chrono_forge/executor/methods/branch.rb', line 47 def spawn(name, workflow_class, **kwargs) cb = current_branch! validate_step_name_segment!(name) child_key = "#{@workflow.key}$#{cb[:name]}$#{name}" dispatch_children(cb, [[child_key, workflow_class, kwargs]]) name end |
#spawn_each(name, source, of: 1000) ⇒ Object
Dispatch one child per item of ‘source`, streamed. AR relations use keyset iteration (find_in_batches start:) for constant memory and are keyed by record id; any other enumerable uses an offset cursor and is keyed `name_index` by position. Either way the source must re-enumerate identically across replays. For AR sources that additionally means STABLE MEMBERSHIP: dispatch resumes from the last primary key on crash-recovery, so a row entering the relation below the cursor after it passed (e.g. a mutating `where(state:)` scope) never gets a child — point spawn_each at a set fixed for the branch’s lifetime. The block returns [WorkflowClass, kwargs] (or a bare class).
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/chrono_forge/executor/methods/branch.rb', line 65 def spawn_each(name, source, of: 1000) cb = current_branch! validate_step_name_segment!(name) cursor = cb[:log].&.dig("cursors", name.to_s) || {} n = cursor["n"] || 0 if source.is_a?(ActiveRecord::Relation) # spawn_each iterates by primary key (find_in_batches) so the stream # re-enumerates identically across replays. An explicit .order would # make iteration non-deterministic, so reject it up front with a clear # error rather than letting find_in_batches raise deep in the loop. if source.order_values.present? raise NotExecutableError, "spawn_each iterates #{source.model_name} by primary key; remove the " \ "explicit .order(...) (or default-scope order) from the source relation" end source.find_in_batches(batch_size: of, start: cursor["pk"]) do |records| entries = records.map do |record| klass, kw = normalize_spawn(yield(record)) # Stable per-record key: an inclusive find_in_batches re-yield of the # boundary record on crash-resume produces the SAME key, so insert_all # dedups it (idempotent). Sequential indexing would duplicate it. ck = "#{@workflow.key}$#{cb[:name]}$#{name}_#{record.id}" [ck, klass, kw] end dispatch_children(cb, entries) advance_cursor!(cb, name, pk: records.last.id) end else source.drop(n).each_slice(of) do |slice| entries = slice.map do |item| klass, kw = normalize_spawn(yield(item)) ck = "#{@workflow.key}$#{cb[:name]}$#{name}_#{n}" n += 1 [ck, klass, kw] end dispatch_children(cb, entries) advance_cursor!(cb, name, n: n) end end name end |