Module: ChronoForge::Executor::Methods::Branch

Included in:
ChronoForge::Executor::Methods
Defined in:
lib/chrono_forge/executor/methods/branch.rb

Instance Method Summary collapse

Instance Method Details

#branch(name, automerge: false) ⇒ Object

Opens a named branch — a durable fan-out step. Spawns inside the block eagerly create + enqueue child workflows; the branch SEALS when the block closes. Returns without waiting (branches are concurrent; the join is a separate merge_branches / automerge).

Raises:

  • (ArgumentError)


9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/chrono_forge/executor/methods/branch.rb', line 9

def branch(name, automerge: false)
  raise ArgumentError, "branch requires a block" unless block_given?
  raise ArgumentError, "branch blocks cannot be nested" if @current_branch
  validate_step_name_segment!(name)

  step_name = "branch$#{name}"
  log = find_or_create_execution_log!(step_name) { |l| l.started_at = Time.current }

  # The sealed branch log may be a readonly, id-less cache stand-in; fetch
  # the real id so the registry/merge can scope children to it.
  log_id = log.id || ExecutionLog.where(workflow: @workflow, step_name: step_name).pick(:id)
  (@open_branches ||= {})[name.to_s] = {automerge: automerge, log_id: log_id}

  # ---- THE single most important correctness/performance property ----
  # A SEALED branch skips its block ENTIRELY. The expensive source
  # enumeration in spawn_each never re-runs after sealing. Do not move
  # dispatch out from behind this guard.
  unless log.completed?
    @current_branch = {name: name.to_s, log: log}
    begin
      yield
    ensure
      @current_branch = nil
    end
    log.update!(state: :completed, completed_at: Time.current)
  end

  # automerge joins the branch inline, the moment its block closes (eager
  # dispatch + immediate await). Deferred/concurrent joins use an explicit
  # merge_branches instead. Runs on every pass so replay re-checks via the
  # merge$<name> log's own idempotency; the inline merge removes the branch
  # from @open_branches on completion, so the completion gate won't see it.
  merge_branches(name) if automerge

  name
end

#spawn(name, workflow_class, **kwargs) ⇒ Object

Dispatch a single child into the current branch.



47
48
49
50
51
52
53
# File 'lib/chrono_forge/executor/methods/branch.rb', line 47

def spawn(name, workflow_class, **kwargs)
  cb = current_branch!
  validate_step_name_segment!(name)
  child_key = "#{@workflow.key}$#{cb[:name]}$#{name}"
  dispatch_children(cb, [[child_key, workflow_class, kwargs]])
  name
end

#spawn_each(name, source, of: 1000) ⇒ Object

Dispatch one child per item of ‘source`, streamed. AR relations use keyset iteration (find_in_batches start:) for constant memory and are keyed by record id; any other enumerable uses an offset cursor and is keyed `name_index` by position. Either way the source must re-enumerate identically across replays. For AR sources that additionally means STABLE MEMBERSHIP: dispatch resumes from the last primary key on crash-recovery, so a row entering the relation below the cursor after it passed (e.g. a mutating `where(state:)` scope) never gets a child — point spawn_each at a set fixed for the branch’s lifetime. The block returns [WorkflowClass, kwargs] (or a bare class).



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/chrono_forge/executor/methods/branch.rb', line 65

def spawn_each(name, source, of: 1000)
  cb = current_branch!
  validate_step_name_segment!(name)
  cursor = cb[:log].&.dig("cursors", name.to_s) || {}
  n = cursor["n"] || 0

  if source.is_a?(ActiveRecord::Relation)
    # spawn_each iterates by primary key (find_in_batches) so the stream
    # re-enumerates identically across replays. An explicit .order would
    # make iteration non-deterministic, so reject it up front with a clear
    # error rather than letting find_in_batches raise deep in the loop.
    if source.order_values.present?
      raise NotExecutableError,
        "spawn_each iterates #{source.model_name} by primary key; remove the " \
        "explicit .order(...) (or default-scope order) from the source relation"
    end
    source.find_in_batches(batch_size: of, start: cursor["pk"]) do |records|
      entries = records.map do |record|
        klass, kw = normalize_spawn(yield(record))
        # Stable per-record key: an inclusive find_in_batches re-yield of the
        # boundary record on crash-resume produces the SAME key, so insert_all
        # dedups it (idempotent). Sequential indexing would duplicate it.
        ck = "#{@workflow.key}$#{cb[:name]}$#{name}_#{record.id}"
        [ck, klass, kw]
      end
      dispatch_children(cb, entries)
      advance_cursor!(cb, name, pk: records.last.id)
    end
  else
    source.drop(n).each_slice(of) do |slice|
      entries = slice.map do |item|
        klass, kw = normalize_spawn(yield(item))
        ck = "#{@workflow.key}$#{cb[:name]}$#{name}_#{n}"
        n += 1
        [ck, klass, kw]
      end
      dispatch_children(cb, entries)
      advance_cursor!(cb, name, n: n)
    end
  end
  name
end