Class: Archsight::Import::Executor

Inherits:
Object
  • Object
show all
Defined in:
lib/archsight/import/executor.rb

Overview

Executes imports in dependency order with concurrent processing

The executor:

  1. Loads all Import resources from the database

  2. Finds pending imports whose dependencies are satisfied

  3. Executes ready imports concurrently (up to max_concurrent)

  4. Reloads database once after batch completes to discover new imports

  5. Repeats until no pending imports remain

  6. Stops immediately on first error

Constant Summary collapse

MAX_CONCURRENT =
20
DURATION_PATTERNS =

Duration parsing patterns for cache time

{
  /^(\d+)s$/ => 1,
  /^(\d+)m$/ => 60,
  /^(\d+)h$/ => 3600,
  /^(\d+)d$/ => 86_400
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(database:, resources_dir:, verbose: false, max_concurrent: MAX_CONCURRENT, output: $stdout, filter: nil, force: false) ⇒ Executor

Returns a new instance of Executor.

Parameters:

  • database (Archsight::Database)

    Database instance

  • resources_dir (String)

    Root resources directory

  • verbose (Boolean) (defaults to: false)

    Whether to print verbose debug output

  • max_concurrent (Integer) (defaults to: MAX_CONCURRENT)

    Maximum concurrent imports (default: 20)

  • output (IO) (defaults to: $stdout)

    Output stream for progress (default: $stdout)

  • filter (String, nil) (defaults to: nil)

    Regex pattern to match import names

  • force (Boolean) (defaults to: false)

    Whether to bypass cache and re-run all imports



41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/archsight/import/executor.rb', line 41

def initialize(database:, resources_dir:, verbose: false, max_concurrent: MAX_CONCURRENT, output: $stdout, filter: nil, force: false)
  @database = database
  @resources_dir = resources_dir
  @verbose = verbose
  @max_concurrent = max_concurrent
  @output = output
  @filter = filter
  @filter_regex = Regexp.new(filter, Regexp::IGNORECASE) if filter
  @force = force
  @executed_this_run = Set.new
  @iteration = 0
  @mutex = Mutex.new
end

Instance Attribute Details

#databaseObject (readonly)

Returns the value of attribute database.



32
33
34
# File 'lib/archsight/import/executor.rb', line 32

def database
  @database
end

#filterObject (readonly)

Returns the value of attribute filter.



32
33
34
# File 'lib/archsight/import/executor.rb', line 32

def filter
  @filter
end

#forceObject (readonly)

Returns the value of attribute force.



32
33
34
# File 'lib/archsight/import/executor.rb', line 32

def force
  @force
end

#resources_dirObject (readonly)

Returns the value of attribute resources_dir.



32
33
34
# File 'lib/archsight/import/executor.rb', line 32

def resources_dir
  @resources_dir
end

#verboseObject (readonly)

Returns the value of attribute verbose.



32
33
34
# File 'lib/archsight/import/executor.rb', line 32

def verbose
  @verbose
end

Instance Method Details

#build_finish_messageObject



135
136
137
138
139
140
# File 'lib/archsight/import/executor.rb', line 135

def build_finish_message
  parts = []
  parts << "#{@total_executed} executed" if @total_executed.positive?
  parts << "#{@total_cached} cached" if @total_cached.positive?
  "Completed: #{parts.join(", ")}"
end

#execution_planArray<Archsight::Resources::Import>

Show execution plan without running imports

Returns:



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/archsight/import/executor.rb', line 144

def execution_plan
  reload_database_quietly!

  # Collect all imports
  all_imports = database.instances_by_kind("Import")&.values || []

  # Apply filter if specified
  filtered_imports = all_imports.select { |imp| import_matches_filter?(imp) }

  # Topological sort
  sorted = topological_sort(filtered_imports)

  @output.puts "Filter: #{@filter}" if @filter

  sorted.each_with_index do |imp, idx|
    enabled = import_enabled?(imp)
    deps = import_dependency_names(imp)
    deps_str = deps.empty? ? "(no dependencies)" : "depends on: #{deps.join(", ")}"
    enabled_str = enabled ? "" : " [DISABLED]"
    @output.puts "  #{idx + 1}. #{imp.name}#{enabled_str} #{deps_str}"
  end

  sorted
end

#run!Object

Run all pending imports

Raises:



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/archsight/import/executor.rb', line 58

def run!
  @total_executed = 0
  @total_cached = 0
  @failed_imports = {}
  @first_error = nil
  @interrupted = false
  @concurrent_progress = Archsight::Import::ConcurrentProgress.new(max_slots: @max_concurrent, output: @output)
  @shared_writer = Archsight::Import::SharedFileWriter.new

  # Set up graceful shutdown on Ctrl-C
  setup_signal_handlers

  # Calculate total imports for overall progress
  reload_database_quietly!
  total = count_all_enabled_imports
  @concurrent_progress.total = total if total.positive?

  # Track if we need to reload (skip first iteration since we just reloaded)
  need_reload = false

  loop do
    break if @interrupted

    @iteration += 1
    log "=== Iteration #{@iteration} ==="

    # Only reload if previous batch executed imports (might have generated new Import resources)
    reload_and_update_total! if need_reload

    # Get all pending imports
    pending = pending_imports

    if pending.empty?
      log "No pending imports. Done."
      break
    end

    log "Found #{pending.size} pending import(s)"

    # Find imports whose dependencies are satisfied
    ready = pending.select { |imp| dependencies_satisfied?(imp) }

    if ready.empty?
      unsatisfied = pending.map(&:name).join(", ")
      raise Archsight::Import::DeadlockError, "Deadlock: pending imports have unsatisfied dependencies: #{unsatisfied}"
    end

    # Sort by priority (lower first), then by name for determinism
    ready.sort_by! { |imp| [imp.annotations["import/priority"].to_i, imp.name] }

    # Execute all ready imports at the same priority level concurrently
    current_priority = ready.first.annotations["import/priority"].to_i
    batch = ready.select { |imp| imp.annotations["import/priority"].to_i == current_priority }

    executed_before = @total_executed
    execute_batch_concurrently(batch)

    # Close shared files before potential database reload so new content is visible
    @shared_writer.close_all

    # Only reload next iteration if imports were actually executed (not just cached)
    need_reload = @total_executed > executed_before

    # Stop on first error
    raise Archsight::Import::ImportError, "Import #{@first_error[:name]} failed: #{@first_error[:message]}" if @first_error
  end

  @shared_writer.close_all
  finish_message = build_finish_message
  @concurrent_progress.finish(finish_message) if @total_executed.positive? || @total_cached.positive?

  # Raise InterruptedError if we were interrupted, so CLI can handle it
  raise Archsight::Import::InterruptedError, "Import interrupted by user" if @interrupted
ensure
  restore_signal_handlers
end