Class: Complyance::PersistentQueueManager

Inherits:
Object
  • Object
show all
Defined in:
lib/complyance/persistent_queue_manager.rb

Overview

Manages persistent queue for document submissions

Constant Summary collapse

QUEUE_DIR =
'complyance-queue'
PENDING_DIR =
'pending'
PROCESSING_DIR =
'processing'
FAILED_DIR =
'failed'
SUCCESS_DIR =
'success'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(api_key, local, circuit_breaker = nil) ⇒ PersistentQueueManager

Initialize queue manager

Parameters:

  • api_key (String)

    API key for authentication

  • local (Boolean)

    Whether to use local environment

  • circuit_breaker (CircuitBreaker, nil) (defaults to: nil)

    Optional shared circuit breaker



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/complyance/persistent_queue_manager.rb', line 23

def initialize(api_key, local, circuit_breaker = nil)
  @api_key = api_key
  @local = local
  @queue_base_path = queue_base_path
  @logger = Logger.new(STDOUT)
  @logger.level = Logger::INFO
  @processing_lock = Monitor.new
  @is_running = false

  # Initialize circuit breaker with 3 failure threshold and 1 minute timeout
  @circuit_breaker = circuit_breaker || CircuitBreaker.new(
    failure_threshold: 3,
    reset_timeout: 60
  )

  initialize_queue_directories
  @logger.info "PersistentQueueManager initialized with queue directory: #{@queue_base_path}"

  # Automatically start processing and retry any existing failed submissions
  start_processing
  retry_failed_submissions
end

Instance Attribute Details

#is_runningObject (readonly)

Returns the value of attribute is_running.



17
18
19
# File 'lib/complyance/persistent_queue_manager.rb', line 17

def is_running
  @is_running
end

Instance Method Details

#cleanup_duplicate_filesObject

Clean up duplicate files across queue directories



437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
# File 'lib/complyance/persistent_queue_manager.rb', line 437

def cleanup_duplicate_files
  begin
    @logger.info "Cleaning up duplicate files across queue directories..."

    file_map = {}
    [PENDING_DIR, PROCESSING_DIR, FAILED_DIR, SUCCESS_DIR].each do |dir_name|
      dir = File.join(@queue_base_path, dir_name)
      if File.directory?(dir)
        Dir[File.join(dir, '*.json')].each do |file|
          file_name = File.basename(file)
          if file_map[file_name]
            # File exists in multiple directories, keep the one with latest modification time
            existing_time = File.mtime(file_map[file_name])
            current_time = File.mtime(file)

            if current_time > existing_time
              File.delete(file_map[file_name])
              file_map[file_name] = file
              @logger.debug "Removed duplicate file (older): #{file_map[file_name]}"
            else
              File.delete(file)
              @logger.debug "Removed duplicate file (older): #{file}"
            end
          else
            file_map[file_name] = file
          end
        end
      end
    end

    @logger.info "Duplicate file cleanup completed"

  rescue => e
    @logger.error "Error during duplicate file cleanup: #{e.message}"
  end
end

#cleanup_old_success_files(days_to_keep) ⇒ Object

Clean up old success files

Parameters:

  • days_to_keep (Integer)

    Number of days to keep success files



383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
# File 'lib/complyance/persistent_queue_manager.rb', line 383

def cleanup_old_success_files(days_to_keep)
  begin
    success_dir = File.join(@queue_base_path, SUCCESS_DIR)
    cutoff_time = Time.now - (days_to_keep * 24 * 60 * 60)

    files = Dir[File.join(success_dir, '*.json')].select do |file|
      File.mtime(file) < cutoff_time
    end

    files.each do |file|
      File.delete(file)
      @logger.debug "Cleaned up old success file: #{File.basename(file)}"
    end

    @logger.info "Cleaned up #{files.size} old success files" unless files.empty?

  rescue => e
    @logger.error "Failed to cleanup old success files: #{e.message}"
  end
end

#clear_all_queuesObject

Clear all files from the queue (emergency cleanup)



405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
# File 'lib/complyance/persistent_queue_manager.rb', line 405

def clear_all_queues
  begin
    @logger.info "Clearing all queue directories..."

    clear_directory(PENDING_DIR)
    clear_directory(PROCESSING_DIR)
    clear_directory(FAILED_DIR)
    clear_directory(SUCCESS_DIR)

    @logger.info "All queue directories cleared successfully"

  rescue => e
    @logger.error "Error clearing queue directories: #{e.message}"
    raise "Failed to clear queues: #{e.message}"
  end
end

#enqueue(submission) ⇒ Object

Enqueue a submission for processing

Parameters:

  • submission (PayloadSubmission)

    Submission to enqueue



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/complyance/persistent_queue_manager.rb', line 63

def enqueue(submission)
  begin
    file_name = generate_file_name(submission)
    file_path = File.join(@queue_base_path, PENDING_DIR, file_name)

    # Check if file already exists (same document ID)
    if File.exist?(file_path)
      @logger.info "Document already exists in queue: #{file_name}. Skipping duplicate submission."
      return
    end

    # Parse the UnifyRequest JSON string
    json_payload = submission.payload
    @logger.debug "Queue: Received payload with length: #{json_payload.bytesize} characters"

    # Verify the payload is not empty
    if json_payload.strip.empty? || json_payload == '{}'
      @logger.error "🔥 QUEUE: ERROR - Received empty or invalid payload: '#{json_payload}'"
      raise "Cannot enqueue empty payload"
    end

    # Parse the UnifyRequest JSON string to a proper JSON object
    unify_request_map = JSON.parse(json_payload)

    # Create submission record
    record = {
      payload: unify_request_map,
      source_id: "#{submission.source.name}:#{submission.source.version}",
      country: submission.country.to_s,
      document_type: submission.document_type.to_s,
      enqueued_at: Time.now.iso8601,
      timestamp: (Time.now.to_f * 1000).to_i # Convert to milliseconds for consistency
    }

    # Write to file
    File.write(file_path, JSON.pretty_generate(record))

    @logger.info "Enqueued submission: #{file_name} for source: #{record[:source_id]}, country: #{record[:country]}"

    # Start processing if not already running
    start_processing

  rescue => e
    @logger.error "Failed to enqueue submission to persistent storage: #{e.message}"
    raise "Failed to persist submission: #{e.message}"
  end
end

#extract_document_id(payload) ⇒ String

Extract document ID from payload JSON

Parameters:

  • payload (String)

    JSON payload

Returns:

  • (String)

    Extracted document ID



133
134
135
136
137
138
139
140
141
142
143
# File 'lib/complyance/persistent_queue_manager.rb', line 133

def extract_document_id(payload)
  data = JSON.parse(payload)
  if data.dig('payload', 'invoice_data', 'invoice_number')
    data['payload']['invoice_data']['invoice_number']
  else
    "doc_#{Time.now.to_i}"
  end
rescue => e
  @logger.warn "Failed to extract document ID from payload: #{e.message}"
  "doc_#{Time.now.to_i}"
end

#generate_file_name(submission) ⇒ String

Generate unique filename for submission

Parameters:

  • submission (PayloadSubmission)

    Submission to generate filename for

Returns:

  • (String)

    Generated filename



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/complyance/persistent_queue_manager.rb', line 114

def generate_file_name(submission)
  # Extract document ID from payload
  document_id = extract_document_id(submission.payload)

  # Generate filename using source and document ID for unique reference
  source_id = "#{submission.source.name}:#{submission.source.version}".gsub(/[^\w-]/, '_')
  country = submission.country.to_s

  format("%s_%s_%s_%s.json",
    source_id,
    document_id,
    country,
    submission.document_type.to_s
  )
end

#initialize_queue_directoriesObject

Initialize queue directory structure



53
54
55
56
57
58
59
# File 'lib/complyance/persistent_queue_manager.rb', line 53

def initialize_queue_directories
  [PENDING_DIR, PROCESSING_DIR, FAILED_DIR, SUCCESS_DIR].each do |dir|
    path = File.join(@queue_base_path, dir)
    FileUtils.mkdir_p(path) unless File.directory?(path)
  end
  @logger.debug "Queue directories initialized"
end

#process_pending_submissionsObject

Process pending submissions in queue



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/complyance/persistent_queue_manager.rb', line 192

def process_pending_submissions
  return unless @is_running

  @processing_lock.synchronize do
    begin
      pending_dir = File.join(@queue_base_path, PENDING_DIR)
      files = Dir[File.join(pending_dir, '*.json')]

      if files.empty?
        @logger.debug "No pending submissions to process"
        return
      end

      @logger.debug "Found #{files.size} pending submissions in queue"

      # Check circuit breaker state before attempting to process
      if @circuit_breaker.open?
        current_time = Time.now.to_i
        time_since_last_failure = current_time - @circuit_breaker.last_failure_time

        if time_since_last_failure < 60 # 1 minute
          remaining_time = 60 - time_since_last_failure
          @logger.debug "Circuit breaker is OPEN - #{remaining_time} seconds remaining. Queue has #{files.size} items waiting."
          return
        else
          @logger.debug "Circuit breaker timeout expired - attempting to process #{files.size} queued items"
        end
      end

      files.each do |file_path|
        if File.exist?(file_path)
          begin
            process_submission_file(file_path)
          rescue => e
            @logger.error "Failed to process queued submission #{file_path}: #{e.message}"
          end
        end
      end

    rescue => e
      @logger.error "Error processing pending submissions: #{e.message}"
    end
  end
end

#process_pending_submissions_nowObject

Process pending submissions immediately



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/complyance/persistent_queue_manager.rb', line 162

def process_pending_submissions_now
  @logger.info "Manually triggering processing of pending submissions"

  # Check circuit breaker state before manual processing
  if @circuit_breaker.open?
    current_time = Time.now.to_i
    time_since_last_failure = current_time - @circuit_breaker.last_failure_time

    if time_since_last_failure < 60 # 1 minute
      remaining_time = 60 - time_since_last_failure
      @logger.info "🚫 Circuit breaker is OPEN - remaining time: #{remaining_time}s. Manual processing skipped."
      return
    else
      @logger.info "✅ Circuit breaker timeout expired (#{time_since_last_failure}s) - proceeding with manual processing"
    end
  else
    @logger.info "✅ Circuit breaker is CLOSED - proceeding with manual processing"
  end

  process_pending_submissions
end

#process_submission_file(file_path) ⇒ Object

Process a single submission file

Parameters:

  • file_path (String)

    Path to submission file



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/complyance/persistent_queue_manager.rb', line 239

def process_submission_file(file_path)
  begin
    # Read submission record
    record = JSON.parse(File.read(file_path))
    file_name = File.basename(file_path)
    processing_path = File.join(@queue_base_path, PROCESSING_DIR, file_name)

    # Move to processing directory
    FileUtils.mv(file_path, processing_path)

    @logger.debug "Processing submission: #{file_name} for source: #{record['source_id']}"

    begin
      # Convert stored payload back to UnifyRequest
      unify_request = UnifyRequest.new(record['payload'])

      # Use the SDK's pushToUnify method with circuit breaker
      response = @circuit_breaker.execute do
        GETSUnifySDK.push_to_unify(unify_request)
      end

      # Check for success
      is_success = false
      if response
        is_success = response.success?

        # Check submission status
        if response.data&.submission
          submission = response.data.submission
          is_success ||= submission.accepted? || 
                       submission.status.to_s.downcase == 'accepted'
        end

        # Check document status
        if response.data&.document
          document = response.data.document
          is_success ||= document.status.to_s.downcase == 'success'
        end
      end

      if is_success
        @logger.info "Queue: SUCCESS - Removing file from queue: #{file_name}"
        File.delete(processing_path)
        return
      else
        status = response ? response.status : 'null'
        @logger.warn "Queue: NON-SUCCESS - Moving to failed directory. Status: '#{status}'"
        raise "API returned non-success status: #{status}"
      end

    rescue => e
      @logger.error "Failed to send queued submission via pushToUnify: #{e.message}"

      failed_path = File.join(@queue_base_path, FAILED_DIR, file_name)

      if File.exist?(failed_path)
        File.delete(processing_path)
      else
        FileUtils.mv(processing_path, failed_path)
      end

      raise e
    end

  rescue => e
    @logger.warn "Failed to process submission: #{File.basename(file_path)} - Error: #{e.message}"

    file_name = File.basename(file_path)
    processing_path = File.join(@queue_base_path, PROCESSING_DIR, file_name)
    failed_path = File.join(@queue_base_path, FAILED_DIR, file_name)

    # Move to failed directory
    if File.exist?(processing_path)
      if File.exist?(failed_path)
        File.delete(processing_path)
      else
        FileUtils.mv(processing_path, failed_path)
      end
    elsif File.exist?(file_path)
      if File.exist?(failed_path)
        File.delete(file_path)
      else
        FileUtils.mv(file_path, failed_path)
      end
    end
  end
end

#queue_base_pathString

Get base path for queue directories

Returns:

  • (String)

    Base path



48
49
50
# File 'lib/complyance/persistent_queue_manager.rb', line 48

def queue_base_path
  File.join(ENV['HOME'] || ENV['USERPROFILE'], QUEUE_DIR)
end

#queue_statusHash

Get current queue status

Returns:

  • (Hash)

    Queue status information



329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/complyance/persistent_queue_manager.rb', line 329

def queue_status
  begin
    pending_count = Dir[File.join(@queue_base_path, PENDING_DIR, '*.json')].size
    processing_count = Dir[File.join(@queue_base_path, PROCESSING_DIR, '*.json')].size
    failed_count = Dir[File.join(@queue_base_path, FAILED_DIR, '*.json')].size
    success_count = Dir[File.join(@queue_base_path, SUCCESS_DIR, '*.json')].size

    {
      pending_count: pending_count,
      processing_count: processing_count,
      failed_count: failed_count,
      success_count: success_count,
      is_running: @is_running
    }

  rescue => e
    @logger.error "Failed to get queue status: #{e.message}"
    {
      pending_count: 0,
      processing_count: 0,
      failed_count: 0,
      success_count: 0,
      is_running: false
    }
  end
end

#retry_failed_submissionsObject

Retry failed submissions



357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# File 'lib/complyance/persistent_queue_manager.rb', line 357

def retry_failed_submissions
  begin
    failed_dir = File.join(@queue_base_path, FAILED_DIR)
    files = Dir[File.join(failed_dir, '*.json')]

    if files.empty?
      @logger.info "No failed submissions to retry"
      return
    end

    @logger.info "Retrying #{files.size} failed submissions"

    files.each do |file_path|
      file_name = File.basename(file_path)
      pending_path = File.join(@queue_base_path, PENDING_DIR, file_name)
      FileUtils.mv(file_path, pending_path)
      @logger.debug "Moved failed submission back to pending: #{file_name}"
    end

  rescue => e
    @logger.error "Failed to retry failed submissions: #{e.message}"
  end
end

#start_processingObject

Start processing queue items



146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/complyance/persistent_queue_manager.rb', line 146

def start_processing
  unless @is_running
    @is_running = true
    @logger.info "Started persistent queue processing"

    # Start background processing in a new thread
    @processing_thread = Thread.new do
      while @is_running
        process_pending_submissions
        sleep(0.5) # 500ms delay
      end
    end
  end
end

#stop_processingObject

Stop processing queue items



185
186
187
188
189
# File 'lib/complyance/persistent_queue_manager.rb', line 185

def stop_processing
  @is_running = false
  @processing_thread&.join
  @logger.info "Stopped persistent queue processing"
end