Class: Archsight::Import::Executor
- Inherits:
-
Object
- Object
- Archsight::Import::Executor
- Defined in:
- lib/archsight/import/executor.rb
Overview
Executes imports in dependency order with concurrent processing
The executor:
-
Loads all Import resources from the database
-
Finds pending imports whose dependencies are satisfied
-
Executes ready imports concurrently (up to max_concurrent)
-
Reloads database once after batch completes to discover new imports
-
Repeats until no pending imports remain
-
Stops immediately on first error
Constant Summary collapse
- MAX_CONCURRENT =
20- DURATION_PATTERNS =
Duration parsing patterns for cache time
{ /^(\d+)s$/ => 1, /^(\d+)m$/ => 60, /^(\d+)h$/ => 3600, /^(\d+)d$/ => 86_400 }.freeze
Instance Attribute Summary collapse
-
#database ⇒ Object
readonly
Returns the value of attribute database.
-
#filter ⇒ Object
readonly
Returns the value of attribute filter.
-
#force ⇒ Object
readonly
Returns the value of attribute force.
-
#resources_dir ⇒ Object
readonly
Returns the value of attribute resources_dir.
-
#verbose ⇒ Object
readonly
Returns the value of attribute verbose.
Instance Method Summary collapse
- #build_finish_message ⇒ Object
-
#execution_plan ⇒ Array<Archsight::Resources::Import>
Show execution plan without running imports.
-
#initialize(database:, resources_dir:, verbose: false, max_concurrent: MAX_CONCURRENT, output: $stdout, filter: nil, force: false) ⇒ Executor
constructor
A new instance of Executor.
-
#run! ⇒ Object
Run all pending imports.
Constructor Details
#initialize(database:, resources_dir:, verbose: false, max_concurrent: MAX_CONCURRENT, output: $stdout, filter: nil, force: false) ⇒ Executor
Returns a new instance of Executor.
41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/archsight/import/executor.rb', line 41 def initialize(database:, resources_dir:, verbose: false, max_concurrent: MAX_CONCURRENT, output: $stdout, filter: nil, force: false) @database = database @resources_dir = resources_dir @verbose = verbose @max_concurrent = max_concurrent @output = output @filter = filter @filter_regex = Regexp.new(filter, Regexp::IGNORECASE) if filter @force = force @executed_this_run = Set.new @iteration = 0 @mutex = Mutex.new end |
Instance Attribute Details
#database ⇒ Object (readonly)
Returns the value of attribute database.
32 33 34 |
# File 'lib/archsight/import/executor.rb', line 32 def database @database end |
#filter ⇒ Object (readonly)
Returns the value of attribute filter.
32 33 34 |
# File 'lib/archsight/import/executor.rb', line 32 def filter @filter end |
#force ⇒ Object (readonly)
Returns the value of attribute force.
32 33 34 |
# File 'lib/archsight/import/executor.rb', line 32 def force @force end |
#resources_dir ⇒ Object (readonly)
Returns the value of attribute resources_dir.
32 33 34 |
# File 'lib/archsight/import/executor.rb', line 32 def resources_dir @resources_dir end |
#verbose ⇒ Object (readonly)
Returns the value of attribute verbose.
32 33 34 |
# File 'lib/archsight/import/executor.rb', line 32 def verbose @verbose end |
Instance Method Details
#build_finish_message ⇒ Object
135 136 137 138 139 140 |
# File 'lib/archsight/import/executor.rb', line 135 def parts = [] parts << "#{@total_executed} executed" if @total_executed.positive? parts << "#{@total_cached} cached" if @total_cached.positive? "Completed: #{parts.join(", ")}" end |
#execution_plan ⇒ Array<Archsight::Resources::Import>
Show execution plan without running imports
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/archsight/import/executor.rb', line 144 def execution_plan reload_database_quietly! # Collect all imports all_imports = database.instances_by_kind("Import")&.values || [] # Apply filter if specified filtered_imports = all_imports.select { |imp| import_matches_filter?(imp) } # Topological sort sorted = topological_sort(filtered_imports) @output.puts "Filter: #{@filter}" if @filter sorted.each_with_index do |imp, idx| enabled = import_enabled?(imp) deps = import_dependency_names(imp) deps_str = deps.empty? ? "(no dependencies)" : "depends on: #{deps.join(", ")}" enabled_str = enabled ? "" : " [DISABLED]" @output.puts " #{idx + 1}. #{imp.name}#{enabled_str} #{deps_str}" end sorted end |
#run! ⇒ Object
Run all pending imports
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/archsight/import/executor.rb', line 58 def run! @total_executed = 0 @total_cached = 0 @failed_imports = {} @first_error = nil @interrupted = false @concurrent_progress = Archsight::Import::ConcurrentProgress.new(max_slots: @max_concurrent, output: @output) @shared_writer = Archsight::Import::SharedFileWriter.new # Set up graceful shutdown on Ctrl-C setup_signal_handlers # Calculate total imports for overall progress reload_database_quietly! total = count_all_enabled_imports @concurrent_progress.total = total if total.positive? # Track if we need to reload (skip first iteration since we just reloaded) need_reload = false loop do break if @interrupted @iteration += 1 log "=== Iteration #{@iteration} ===" # Only reload if previous batch executed imports (might have generated new Import resources) reload_and_update_total! if need_reload # Get all pending imports pending = pending_imports if pending.empty? log "No pending imports. Done." break end log "Found #{pending.size} pending import(s)" # Find imports whose dependencies are satisfied ready = pending.select { |imp| dependencies_satisfied?(imp) } if ready.empty? unsatisfied = pending.map(&:name).join(", ") raise Archsight::Import::DeadlockError, "Deadlock: pending imports have unsatisfied dependencies: #{unsatisfied}" end # Sort by priority (lower first), then by name for determinism ready.sort_by! { |imp| [imp.annotations["import/priority"].to_i, imp.name] } # Execute all ready imports at the same priority level concurrently current_priority = ready.first.annotations["import/priority"].to_i batch = ready.select { |imp| imp.annotations["import/priority"].to_i == current_priority } executed_before = @total_executed execute_batch_concurrently(batch) # Close shared files before potential database reload so new content is visible @shared_writer.close_all # Only reload next iteration if imports were actually executed (not just cached) need_reload = @total_executed > executed_before # Stop on first error raise Archsight::Import::ImportError, "Import #{@first_error[:name]} failed: #{@first_error[:message]}" if @first_error end @shared_writer.close_all = @concurrent_progress.finish() if @total_executed.positive? || @total_cached.positive? # Raise InterruptedError if we were interrupted, so CLI can handle it raise Archsight::Import::InterruptedError, "Import interrupted by user" if @interrupted ensure restore_signal_handlers end |