Class: Ace::Assign::Organisms::AssignmentExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/ace/assign/organisms/assignment_executor.rb

Overview

Orchestrates workflow operations on the work queue.

Implements the state machine for queue operations: start → advance → complete (with fail/add/retry branches)

Constant Summary collapse

DEFAULT_DYNAMIC_STEP_INSTRUCTIONS =
"Complete this step and finish with: ace-assign finish --message report.md".freeze
PROJECT_ROOT_SIGNAL =
"project_root".freeze
CATALOG_SIGNAL =
"catalog".freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cache_base: nil, skill_source_resolver: nil, step_catalog: nil) ⇒ AssignmentExecutor

Returns a new instance of AssignmentExecutor.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/ace/assign/organisms/assignment_executor.rb', line 40

def initialize(cache_base: nil, skill_source_resolver: nil, step_catalog: nil)
  @assignment_manager = Molecules::AssignmentManager.new(cache_base: cache_base)
  @queue_scanner = Molecules::QueueScanner.new
  @step_writer = Molecules::StepWriter.new
  @skill_source_resolver = skill_source_resolver || Molecules::SkillAssignSourceResolver.new
  @step_catalog = nil
  @step_catalog_from_fixture = step_catalog
  @step_catalog_from_fixture_set = !step_catalog.nil?
  @step_catalog_loaded = false
  @step_renumberer = Molecules::StepRenumberer.new(
    step_writer: @step_writer,
    queue_scanner: @queue_scanner
  )
end

Instance Attribute Details

#assignment_managerObject (readonly)

Returns the value of attribute assignment_manager.



18
19
20
# File 'lib/ace/assign/organisms/assignment_executor.rb', line 18

def assignment_manager
  @assignment_manager
end

#queue_scannerObject (readonly)

Returns the value of attribute queue_scanner.



18
19
20
# File 'lib/ace/assign/organisms/assignment_executor.rb', line 18

def queue_scanner
  @queue_scanner
end

#skill_source_resolverObject (readonly)

Returns the value of attribute skill_source_resolver.



18
19
20
# File 'lib/ace/assign/organisms/assignment_executor.rb', line 18

def skill_source_resolver
  @skill_source_resolver
end

#step_renumbererObject (readonly)

Returns the value of attribute step_renumberer.



18
19
20
# File 'lib/ace/assign/organisms/assignment_executor.rb', line 18

def step_renumberer
  @step_renumberer
end

#step_writerObject (readonly)

Returns the value of attribute step_writer.



18
19
20
# File 'lib/ace/assign/organisms/assignment_executor.rb', line 18

def step_writer
  @step_writer
end

Class Method Details

.cache_storeObject



25
26
27
# File 'lib/ace/assign/organisms/assignment_executor.rb', line 25

def cache_store
  @cache_store ||= { step_catalog_cache: {} }
end

.clear_caches!Object



21
22
23
# File 'lib/ace/assign/organisms/assignment_executor.rb', line 21

def clear_caches!
  @cache_store = { step_catalog_cache: {} }
end

Instance Method Details

#add(name, instructions, after: nil, as_child: false, added_by: nil, extra: {}) ⇒ Hash

Add a new step dynamically

Parameters:

  • name (String)

    Step name

  • instructions (String)

    Step instructions

  • after (String, nil) (defaults to: nil)

    Insert after this step number (optional)

  • as_child (Boolean) (defaults to: false)

    Insert as child of ‘after’ step (default: false, sibling)

Returns:

  • (Hash)

    Result with new step

Raises:



318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
# File 'lib/ace/assign/organisms/assignment_executor.rb', line 318

def add(name, instructions, after: nil, as_child: false, added_by: nil, extra: {})
  assignment = assignment_manager.find_active
  raise AssignmentErrors::NoActive, "No active assignment. Use 'ace-assign create --yaml <job.yaml>' to begin." unless assignment

  step_name = name.to_s.strip
  raise Error, "Step name cannot be empty." if step_name.empty?

  state = queue_scanner.scan(assignment.steps_dir, assignment: assignment)
  existing_numbers = queue_scanner.step_numbers(assignment.steps_dir)

  # Validate --after step exists
  if after && !existing_numbers.include?(after)
    raise StepErrors::NotFound, "Step #{after} not found. Available steps: #{existing_numbers.join(", ")}"
  end

  new_number, renumbered = calculate_insertion_point(
    after: after,
    as_child: as_child,
    state: state,
    existing_numbers: existing_numbers
  )

  # Renumber existing steps if needed (uses molecule with rollback support)
  if renumbered.any?
    step_renumberer.renumber(assignment.steps_dir, renumbered)
    # Refresh existing numbers after renumbering
    queue_scanner.step_numbers(assignment.steps_dir)
  end

  # Determine initial status upfront to avoid redundant I/O
  initial_status = state.current ? :pending : :in_progress

  # Build added_by metadata for audit trail
  added_by ||= if after && as_child
    "child_of:#{after}"
  elsif after
    "injected_after:#{after}"
  else
    "dynamic"
  end

  extra_frontmatter = normalize_batch_extra_fields(extra)

  # Create new step file with correct status
  step_writer.create(
    steps_dir: assignment.steps_dir,
    number: new_number,
    name: step_name,
    instructions: instructions,
    status: initial_status,
    added_by: added_by,
    parent: as_child ? after : nil,
    extra: extra_frontmatter
  )

  rebalance_after_child_injection(assignment: assignment, state: state, parent_number: after) if as_child && after

  # Update assignment timestamp
  assignment_manager.update(assignment)

  # Return updated state
  new_state = queue_scanner.scan(assignment.steps_dir, assignment: assignment)
  new_step = new_state.steps.find { |s| s.number == new_number }

  {
    assignment: assignment,
    state: new_state,
    added: new_step,
    renumbered: renumbered
  }
end

#add_batch(steps:, after: nil, as_child: false, source_file: nil) ⇒ Hash

Note:

Structural validation is performed for the full batch before any writes. Runtime I/O failures can still interrupt insertion after partial writes.

Add multiple steps dynamically from a pre-parsed steps array.

Parameters:

  • steps (Array<Hash>)

    Step definitions loaded from YAML

  • after (String, nil) (defaults to: nil)

    Insert after this step number

  • as_child (Boolean) (defaults to: false)

    Insert as children of after

  • source_file (String, nil) (defaults to: nil)

    Source YAML path (for added_by audit metadata)

Returns:

  • (Hash)

    Result with added steps and final state



399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
# File 'lib/ace/assign/organisms/assignment_executor.rb', line 399

def add_batch(steps:, after: nil, as_child: false, source_file: nil)
  unless steps.is_a?(Array) && steps.any?
    source_label = source_file.to_s.strip.empty? ? "batch input" : source_file
    raise Error, "No steps defined in #{source_label}"
  end

  if as_child && (after.nil? || after.to_s.strip.empty?)
    raise Error, "Child insertion requires an after step reference."
  end

  prevalidate_batch_trees!(steps)

  added_steps = []
  renumbered = []
  sibling_cursor = after

  steps.each_with_index do |step_config, index|
    inserted = insert_batch_step_tree(
      step_config,
      after: as_child ? after : sibling_cursor,
      as_child: as_child,
      added_by: nil,
      location: "steps[#{index}]"
    )
    added_steps.concat(inserted[:added])
    renumbered.concat(inserted[:renumbered])
    sibling_cursor = inserted[:root_number] unless as_child
  end

  assignment = assignment_manager.find_active
  state = queue_scanner.scan(assignment.steps_dir, assignment: assignment)
  {
    assignment: assignment,
    state: state,
    added: added_steps,
    renumbered: renumbered.uniq
  }
end

#advance(report_path, fork_root: nil) ⇒ Hash

Complete current step with report and advance

Legacy bridge: preserves single-call semantics for fork-run callers. Previously, advance() auto-started the next step as a side effect. The new start/finish split makes this explicit, but advance() retains the auto-start behavior for subtree entry so fork-run workflows (which call advance() with fork_root) continue to work unchanged.

Parameters:

  • report_path (String)

    Path to report file

  • fork_root (String, nil) (defaults to: nil)

    Optional subtree root to constrain advancement

Returns:

  • (Hash)

    Result with updated state

Raises:



258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/ace/assign/organisms/assignment_executor.rb', line 258

def advance(report_path, fork_root: nil)
  raise ConfigErrors::NotFound, "Report file not found: #{report_path}" unless File.exist?(report_path)

  # Auto-start the next workable subtree step when fork_root is given but
  # no step in the subtree is yet in_progress (subtree entry case).
  fork_root_str = fork_root&.strip
  if fork_root_str && !fork_root_str.empty?
    assignment = assignment_manager.find_active
    if assignment
      state = queue_scanner.scan(assignment.steps_dir, assignment: assignment)
      active_in_subtree = state.in_progress_in_subtree(fork_root_str)
      if active_in_subtree.size > 1
        active_refs = active_in_subtree.map { |step| "#{step.number}(#{step.name})" }.join(", ")
        raise StepErrors::InvalidState, "Cannot advance subtree #{fork_root_str}: multiple steps are in progress (#{active_refs})."
      end

      if active_in_subtree.empty?
        next_workable = state.next_workable_in_subtree(fork_root_str)
        step_writer.mark_in_progress(next_workable.file_path) if next_workable
      end
    end
  end

  finish_step(report_content: File.read(report_path), fork_root: fork_root)
end

#fail(message) ⇒ Hash

Mark current step as failed

Parameters:

  • message (String)

    Error message

Returns:

  • (Hash)

    Result with updated state

Raises:



288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/ace/assign/organisms/assignment_executor.rb', line 288

def fail(message)
  assignment = assignment_manager.find_active
  raise AssignmentErrors::NoActive, "No active assignment. Use 'ace-assign create --yaml <job.yaml>' to begin." unless assignment

  state = queue_scanner.scan(assignment.steps_dir, assignment: assignment)
  current = state.current
  raise Error, "No step currently in progress. Try 'ace-assign add' to add a new step or 'ace-assign retry' to retry a failed step." unless current

  # Mark step as failed
  step_writer.mark_failed(current.file_path, error_message: message)

  # Update assignment timestamp
  assignment_manager.update(assignment)

  # Return updated state (no automatic advancement after failure)
  new_state = queue_scanner.scan(assignment.steps_dir, assignment: assignment)
  {
    assignment: assignment,
    state: new_state,
    failed: current
  }
end

#finish_step(report_content:, step_number: nil, fork_root: nil) ⇒ Hash

Finish an in-progress step and advance queue state.

Parameters:

  • report_content (String)

    Completion report content

  • step_number (String, nil) (defaults to: nil)

    Optional in-progress step number to finish

  • fork_root (String, nil) (defaults to: nil)

    Optional subtree root to constrain advancement

Returns:

  • (Hash)

    Result with completed step and updated state

Raises:



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/ace/assign/organisms/assignment_executor.rb', line 197

def finish_step(report_content:, step_number: nil, fork_root: nil)
  assignment = assignment_manager.find_active
  raise AssignmentErrors::NoActive, "No active assignment. Use 'ace-assign create --yaml <job.yaml>' to begin." unless assignment

  state = queue_scanner.scan(assignment.steps_dir, assignment: assignment)
  current = find_target_step_for_finish(state, step_number, fork_root)
  raise Error, "No step currently in progress. Try 'ace-assign start' or 'ace-assign retry'." unless current

  # Enforce hierarchy: cannot mark parent as done with incomplete children
  if state.has_incomplete_children?(current.number)
    incomplete = state.children_of(current.number).reject { |c| c.status == :done }
    incomplete_nums = incomplete.map(&:number).join(", ")
    raise Error, "Cannot complete step #{current.number}: has incomplete children (#{incomplete_nums}). Complete children first or use 'ace-assign fail' to mark as failed."
  end

  # Mark current step as done
  step_writer.mark_done(current.file_path, report_content: report_content, reports_dir: assignment.reports_dir)

  # Rescan to get updated state after marking done
  state = queue_scanner.scan(assignment.steps_dir, assignment: assignment)

  # Auto-complete parent steps if all their children are done
  auto_complete_parents(state, assignment)

  # Re-scan to get fresh state after auto-completions
  state = queue_scanner.scan(assignment.steps_dir, assignment: assignment)

  fork_root = fork_root&.strip
  # Find next step to work on using hierarchical rules.
  # When fork_root is provided, keep advancement inside that subtree.
  next_step = if fork_root && !fork_root.empty? && state.find_by_number(fork_root)
    find_next_step_in_subtree(state, current.number, fork_root)
  else
    find_next_step(state, current.number)
  end
  if next_step
    step_writer.mark_in_progress(next_step.file_path)
  end

  assignment_manager.update(assignment)

  new_state = queue_scanner.scan(assignment.steps_dir, assignment: assignment)
  {
    assignment: assignment,
    state: new_state,
    completed: current,
    current: new_state.current
  }
end

#retry_step(step_ref) ⇒ Hash

Retry a failed step (creates new step linked to original)

Parameters:

  • step_ref (String)

    Step number or reference to retry

Returns:

  • (Hash)

    Result with new retry step

Raises:



442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
# File 'lib/ace/assign/organisms/assignment_executor.rb', line 442

def retry_step(step_ref)
  assignment = assignment_manager.find_active
  raise AssignmentErrors::NoActive, "No active assignment. Use 'ace-assign create --yaml <job.yaml>' to begin." unless assignment

  state = queue_scanner.scan(assignment.steps_dir, assignment: assignment)

  # Find the step to retry
  original = state.find_by_number(step_ref.to_s)
  raise StepErrors::NotFound, "Step #{step_ref} not found in queue" unless original

  # Get existing numbers
  existing_numbers = queue_scanner.step_numbers(assignment.steps_dir)

  # Insert after all current steps (at end of queue before pending)
  # Find last done or failed step
  base_number = if state.current
    state.current.number
  elsif state.last_done
    state.last_done.number
  else
    original.number
  end

  new_number = Atoms::NumberGenerator.next_after(base_number, existing_numbers)

  # Create retry step with link to original
  step_writer.create(
    steps_dir: assignment.steps_dir,
    number: new_number,
    name: original.name,
    instructions: original.instructions,
    status: :pending,
    added_by: "retry_of:#{original.number}"
  )

  # Update assignment timestamp
  assignment_manager.update(assignment)

  # Return updated state
  new_state = queue_scanner.scan(assignment.steps_dir, assignment: assignment)
  retry_step = new_state.steps.find { |s| s.number == new_number }

  {
    assignment: assignment,
    state: new_state,
    retry: retry_step,
    original: original
  }
end

#start(config_path, parent_id: nil) ⇒ Hash

Start a new workflow assignment from config file

Parameters:

  • config_path (String)

    Path to job.yaml config

  • parent_id (String, nil) (defaults to: nil)

    Parent assignment ID for hierarchy linking

Returns:

  • (Hash)

    Result with assignment and first step

Raises:



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/ace/assign/organisms/assignment_executor.rb', line 60

def start(config_path, parent_id: nil)
  raise ConfigErrors::NotFound, "Config file not found: #{config_path}" unless File.exist?(config_path)

  config = YAML.safe_load_file(config_path, permitted_classes: [Time, Date])

  assignment_config = config["assignment"] || {}
  steps_config = config["steps"] || []

  raise Error, "No steps defined in config" if steps_config.empty?

  # Enrich steps using declared workflow/skill assign metadata.
  steps_config = enrich_declared_sub_steps(steps_config)

  # Expand sub-step declarations into batch parent + child steps
  steps_config = expand_sub_steps(steps_config)
  steps_config = materialize_skill_backed_steps(steps_config)

  # Create assignment
  assignment = assignment_manager.create(
    name: assignment_config["name"] || File.basename(config_path, ".yaml"),
    description: assignment_config["description"],
    source_config: config_path,
    parent: parent_id
  )

  # Create initial step files
  # Steps may have pre-assigned numbers (from expansion) or need auto-numbering
  steps_config.each_with_index do |step, index|
    # Use pre-assigned number if present, otherwise generate from index
    number = step["number"] || Atoms::NumberGenerator.from_index(index)
    extra = step.reject { |k, _| %w[name instructions number].include?(k) }
    step_writer.create(
      steps_dir: assignment.steps_dir,
      number: number,
      name: step["name"],
      instructions: normalize_instructions(step["instructions"]),
      status: :pending,
      extra: extra
    )
  end

  # Mark first workable step as in_progress.
  # This skips batch parent containers that have incomplete children.
  initial_state = queue_scanner.scan(assignment.steps_dir, assignment: assignment)
  first_workable = initial_state.next_workable
  step_writer.mark_in_progress(first_workable.file_path) if first_workable

  # Archive source config into task's steps directory and update assignment metadata
  archived_path = archive_source_config(config_path, assignment.id)
  assignment = Models::Assignment.new(
    id: assignment.id,
    name: assignment.name,
    description: assignment.description,
    created_at: assignment.created_at,
    updated_at: assignment.updated_at,
    source_config: archived_path,
    cache_dir: assignment.cache_dir,
    parent: assignment.parent
  )
  assignment_manager.update(assignment)

  # Return result
  state = queue_scanner.scan(assignment.steps_dir, assignment: assignment)
  {
    assignment: assignment,
    state: state,
    current: state.current
  }
end

#start_step(step_number: nil, fork_root: nil) ⇒ Hash

Start a pending step.

Rules:

  • Fails if any step is already in progress (strict mode)

  • Starts an explicit pending target when provided

  • Otherwise starts the next workable pending step

Parameters:

  • step_number (String, nil) (defaults to: nil)

    Optional target step number

  • fork_root (String, nil) (defaults to: nil)

    Optional subtree root scope

Returns:

  • (Hash)

    Result with started step and updated state

Raises:



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/ace/assign/organisms/assignment_executor.rb', line 155

def start_step(step_number: nil, fork_root: nil)
  assignment = assignment_manager.find_active
  raise AssignmentErrors::NoActive, "No active assignment. Use 'ace-assign create --yaml <job.yaml>' to begin." unless assignment

  state = queue_scanner.scan(assignment.steps_dir, assignment: assignment)
  raise StepErrors::InvalidState, "Cannot start: step #{state.current.number} is already in progress. Finish or fail it first." if state.current

  fork_root = fork_root&.strip
  target_step = if step_number && !step_number.to_s.strip.empty?
    find_target_step_for_start(state, step_number, fork_root)
  elsif fork_root && !fork_root.empty?
    raise StepErrors::NotFound, "Subtree root #{fork_root} not found in assignment." unless state.find_by_number(fork_root)
    state.next_workable_in_subtree(fork_root)
  else
    state.next_workable
  end

  unless target_step
    if fork_root && !fork_root.empty?
      raise StepErrors::InvalidState, "No pending workable step found in subtree #{fork_root}."
    end
    raise StepErrors::InvalidState, "No pending workable step found."
  end

  step_writer.mark_in_progress(target_step.file_path)
  assignment_manager.update(assignment)

  new_state = queue_scanner.scan(assignment.steps_dir, assignment: assignment)
  {
    assignment: assignment,
    state: new_state,
    started: new_state.find_by_number(target_step.number),
    current: new_state.current
  }
end

#statusHash

Get current assignment and queue state

Returns:

  • (Hash)

    Result with assignment and state

Raises:



133
134
135
136
137
138
139
140
141
142
143
# File 'lib/ace/assign/organisms/assignment_executor.rb', line 133

def status
  assignment = assignment_manager.find_active
  raise AssignmentErrors::NoActive, "No active assignment. Use 'ace-assign create --yaml <job.yaml>' to begin." unless assignment

  state = queue_scanner.scan(assignment.steps_dir, assignment: assignment)
  {
    assignment: assignment,
    state: state,
    current: state.current
  }
end