Class: Archsight::Import::ConcurrentProgress

Inherits:
Object
  • Object
show all
Defined in:
lib/archsight/import/concurrent_progress.rb

Overview

Manages concurrent progress output with slot-based display

In TTY mode: Each slot gets its own line, updated in place using ANSI codes with colors In non-TTY mode: Each update prints on its own line with context prefix (no colors)

Defined Under Namespace

Classes: SlotProgress

Constant Summary collapse

COLORS =

ANSI color codes

{
  reset: "\e[0m",
  bold: "\e[1m",
  dim: "\e[2m",
  green: "\e[32m",
  yellow: "\e[33m",
  blue: "\e[34m",
  magenta: "\e[35m",
  cyan: "\e[36m",
  red: "\e[31m"
}.freeze
CURSOR_HIDE =

ANSI cursor control

"\e[?25l"
CURSOR_SHOW =
"\e[?25h"
CURSOR_SAVE =
"\e[s"
CURSOR_RESTORE =
"\e[u"

Instance Method Summary collapse

Constructor Details

#initialize(max_slots:, output: $stdout) ⇒ ConcurrentProgress

Returns a new instance of ConcurrentProgress.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/archsight/import/concurrent_progress.rb', line 27

def initialize(max_slots:, output: $stdout)
  @output = output
  @tty = output.respond_to?(:tty?) && output.tty?
  @max_slots = max_slots
  @mutex = Mutex.new
  @slots = {}
  @slot_queue = Queue.new
  @lines_printed = 0

  # Overall progress tracking
  @total_imports = 0
  @completed_imports = 0
  @has_overall_line = false
  @start_time = nil

  # Initialize slot queue
  max_slots.times { |i| @slot_queue << i }
end

Instance Method Details

#acquire_slot(context) ⇒ SlotProgress

Acquire a slot for a new task

Returns:



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/archsight/import/concurrent_progress.rb', line 85

def acquire_slot(context)
  slot_id = @slot_queue.pop
  slot = SlotProgress.new(self, slot_id, context)

  @mutex.synchronize do
    @slots[slot_id] = slot
    # Slot lines start after the overall progress line (if present)
    effective_line = @has_overall_line ? slot_id + 1 : slot_id
    if @tty && effective_line >= @lines_printed
      # Print empty lines to reserve space
      (@lines_printed..effective_line).each { @output.puts }
      @lines_printed = effective_line + 1
    end
  end

  slot
end

#complete_slot(slot_id, context, message = nil) ⇒ Object

Mark a slot as complete



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/archsight/import/concurrent_progress.rb', line 133

def complete_slot(slot_id, context, message = nil)
  @mutex.synchronize do
    if @tty
      effective_line = @has_overall_line ? slot_id + 1 : slot_id
      lines_up = @lines_printed - effective_line
      @output.print "\e[#{lines_up}A"
      @output.print "\e[2K"
      msg = message || "Done"
      @output.print "#{COLORS[:bold]}#{context}#{COLORS[:reset]} - #{COLORS[:green]}#{msg}#{COLORS[:reset]}"
      @output.print "\e[#{lines_up}B"
      @output.print "\r"
      @output.flush
    elsif message
      @output.puts "#{context} - #{message}"
    end
  end
end

#error_slot(slot_id, context, message) ⇒ Object

Report an error for a slot



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/archsight/import/concurrent_progress.rb', line 152

def error_slot(slot_id, context, message)
  safe_message = sanitize_message(message)
  @mutex.synchronize do
    if @tty
      effective_line = @has_overall_line ? slot_id + 1 : slot_id
      lines_up = @lines_printed - effective_line
      @output.print "\e[#{lines_up}A"
      @output.print "\e[2K"
      @output.print "#{COLORS[:bold]}#{context}#{COLORS[:reset]} - #{COLORS[:red]}Error: #{safe_message}#{COLORS[:reset]}"
      @output.print "\e[#{lines_up}B"
      @output.print "\r"
      @output.flush
    else
      @output.puts "#{context} - Error: #{safe_message}"
    end
  end
end

#finish(message) ⇒ Object

Print a final summary (restores cursor and shows it) Note: Use finish_from_trap when called from a signal handler



182
183
184
185
186
# File 'lib/archsight/import/concurrent_progress.rb', line 182

def finish(message)
  @mutex.synchronize do
    finish_unsafe(message)
  end
end

#finish_from_trap(message) ⇒ Object

Trap-safe version of finish (no mutex, safe to call from signal handlers)



189
190
191
# File 'lib/archsight/import/concurrent_progress.rb', line 189

def finish_from_trap(message)
  finish_unsafe(message)
end

#increment_completedObject

Increment completed count and update overall progress



76
77
78
79
80
81
# File 'lib/archsight/import/concurrent_progress.rb', line 76

def increment_completed
  @mutex.synchronize do
    @completed_imports += 1
    update_overall_line if @tty && @has_overall_line
  end
end

#interrupt(message) ⇒ Object

Show interrupt message without clearing progress (called on first Ctrl-C) Note: Use interrupt_from_trap when called from a signal handler



195
196
197
198
199
# File 'lib/archsight/import/concurrent_progress.rb', line 195

def interrupt(message)
  @mutex.synchronize do
    interrupt_unsafe(message)
  end
end

#interrupt_from_trap(message) ⇒ Object

Trap-safe version of interrupt (no mutex, safe to call from signal handlers)



202
203
204
# File 'lib/archsight/import/concurrent_progress.rb', line 202

def interrupt_from_trap(message)
  interrupt_unsafe(message)
end

#release_slot(slot_id) ⇒ Object

Release a slot back to the pool



104
105
106
107
108
109
# File 'lib/archsight/import/concurrent_progress.rb', line 104

def release_slot(slot_id)
  @mutex.synchronize do
    @slots.delete(slot_id)
  end
  @slot_queue << slot_id
end

#sanitize_message(message) ⇒ Object

Sanitize message to prevent breaking TTY display (remove newlines, truncate)



171
172
173
174
175
176
177
178
# File 'lib/archsight/import/concurrent_progress.rb', line 171

def sanitize_message(message)
  return "" if message.nil?

  # Replace newlines with spaces and collapse multiple spaces
  clean = message.to_s.gsub(/[\r\n]+/, " ").gsub(/\s+/, " ").strip
  # Truncate if too long
  clean.length > 80 ? "#{clean[0, 77]}..." : clean
end

#total=(total) ⇒ Object

Initialize total number of imports for overall progress



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/archsight/import/concurrent_progress.rb', line 51

def total=(total)
  @mutex.synchronize do
    @total_imports = total
    @completed_imports = 0
    @start_time = Time.now
    if @tty && !@has_overall_line
      # Save cursor position and hide cursor for clean display
      @output.print CURSOR_SAVE
      @output.print CURSOR_HIDE
      @output.puts build_overall_line
      @has_overall_line = true
      @lines_printed += 1
    end
  end
end

#tty?Boolean

Returns:

  • (Boolean)


46
47
48
# File 'lib/archsight/import/concurrent_progress.rb', line 46

def tty?
  @tty
end

#update_slot(slot_id, context, message, current: nil, total: nil, color: nil) ⇒ Object

Update a specific slot’s display



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/archsight/import/concurrent_progress.rb', line 112

def update_slot(slot_id, context, message, current: nil, total: nil, color: nil)
  line = build_line(context, message, current, total, color: color)

  @mutex.synchronize do
    if @tty
      # Move cursor to slot line and update (account for overall progress line)
      effective_line = @has_overall_line ? slot_id + 1 : slot_id
      lines_up = @lines_printed - effective_line
      @output.print "\e[#{lines_up}A" # Move up
      @output.print "\e[2K"           # Clear line
      @output.print line
      @output.print "\e[#{lines_up}B" # Move back down
      @output.print "\r"              # Return to start of line
      @output.flush
    else
      @output.puts line
    end
  end
end

#update_total(total) ⇒ Object

Update total without resetting completed count (for multi-stage imports)



68
69
70
71
72
73
# File 'lib/archsight/import/concurrent_progress.rb', line 68

def update_total(total)
  @mutex.synchronize do
    @total_imports = total
    update_overall_line if @tty && @has_overall_line
  end
end