Class: ComplyanceSDK::Queue::PersistentQueueManager

Inherits:
Object
  • Object
show all
Defined in:
lib/complyance_sdk/queue/persistent_queue_manager.rb

Overview

Persistent queue manager for handling failed submissions with retry logic Ruby equivalent of the Java PersistentQueueManager

Constant Summary collapse

QUEUE_DIR =
'complyance-queue'
PENDING_DIR =
'pending'
PROCESSING_DIR =
'processing'
FAILED_DIR =
'failed'
SUCCESS_DIR =
'success'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(api_key, local = false, logger = nil) ⇒ PersistentQueueManager

Initialize the persistent queue manager

Parameters:

  • api_key (String)

    The API key

  • local (Boolean) (defaults to: false)

    Whether running in local mode

  • logger (Logger) (defaults to: nil)

    Optional logger instance



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/complyance_sdk/queue/persistent_queue_manager.rb', line 27

def initialize(api_key, local = false, logger = nil)
  @api_key = api_key
  @local = local
  @queue_base_path = File.join(Dir.home, QUEUE_DIR)
  @logger = logger || Logger.new(STDOUT)
  @processing_lock = Mutex.new
  @running = false
  @paused = false

  initialize_queue_directories
  @logger.info("PersistentQueueManager initialized with queue directory: #{@queue_base_path}")

  # Automatically start processing and retry any existing failed submissions
  start_processing
  retry_failed_submissions
end

Instance Attribute Details

#api_keyObject (readonly)

Returns the value of attribute api_key.



20
21
22
# File 'lib/complyance_sdk/queue/persistent_queue_manager.rb', line 20

def api_key
  @api_key
end

#localObject (readonly)

Returns the value of attribute local.



20
21
22
# File 'lib/complyance_sdk/queue/persistent_queue_manager.rb', line 20

def local
  @local
end

#loggerObject (readonly)

Returns the value of attribute logger.



20
21
22
# File 'lib/complyance_sdk/queue/persistent_queue_manager.rb', line 20

def logger
  @logger
end

#queue_base_pathObject (readonly)

Returns the value of attribute queue_base_path.



20
21
22
# File 'lib/complyance_sdk/queue/persistent_queue_manager.rb', line 20

def queue_base_path
  @queue_base_path
end

Instance Method Details

#cleanup_duplicate_filesObject

Clean up duplicate files across queue directories



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/complyance_sdk/queue/persistent_queue_manager.rb', line 254

def cleanup_duplicate_files
  @logger.info('Cleaning up duplicate files across queue directories...')

  queue_item_map = {}
  file_map = {}

  [PENDING_DIR, PROCESSING_DIR, FAILED_DIR, SUCCESS_DIR].each do |dir_name|
    dir_path = File.join(@queue_base_path, dir_name)
    next unless Dir.exist?(dir_path)

    Dir.glob(File.join(dir_path, '*.json')).each do |file_path|
      file_name = File.basename(file_path)
      queue_item_id = read_queue_item_id(file_path, file_name)
      dedupe_key = queue_item_id.to_s.empty? ? file_name.sub(/\.json\z/, '') : queue_item_id
      existing_file = queue_item_map[dedupe_key] || file_map[file_name]

      if existing_file
        begin
          existing_time = File.mtime(existing_file)
          current_time = File.mtime(file_path)

          if current_time > existing_time
            File.delete(existing_file) if File.exist?(existing_file)
            queue_item_map[dedupe_key] = file_path
            file_map[file_name] = file_path
          else
            File.delete(file_path) if File.exist?(file_path)
          end
        rescue => _e
          File.delete(file_path) if File.exist?(file_path)
        end
      else
        queue_item_map[dedupe_key] = file_path
        file_map[file_name] = file_path
      end
    end
  end

  @logger.info('Duplicate file cleanup completed')
rescue => e
  @logger.error("Error during duplicate file cleanup: #{e.message}")
end

#cleanup_old_success_files(days_to_keep) ⇒ Object

Clean up old success files

Parameters:

  • days_to_keep (Integer)

    Number of days to keep success files



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/complyance_sdk/queue/persistent_queue_manager.rb', line 221

def cleanup_old_success_files(days_to_keep)
  success_dir = File.join(@queue_base_path, SUCCESS_DIR)
  cutoff_time = Time.now - (days_to_keep * 24 * 60 * 60)

  old_files = Dir.glob(File.join(success_dir, '*.json')).select do |file_path|
    File.mtime(file_path) < cutoff_time
  end

  old_files.each do |file_path|
    File.delete(file_path)
    @logger.debug("Cleaned up old success file: #{File.basename(file_path)}")
  end

  @logger.info("Cleaned up #{old_files.length} old success files") unless old_files.empty?
rescue => e
  @logger.error("Failed to cleanup old success files: #{e.message}")
end

#clear_all_queuesObject

Clear all files from the queue (emergency cleanup)



240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/complyance_sdk/queue/persistent_queue_manager.rb', line 240

def clear_all_queues
  @logger.info('Clearing all queue directories...')

  [PENDING_DIR, PROCESSING_DIR, FAILED_DIR, SUCCESS_DIR].each do |dir_name|
    clear_directory(dir_name)
  end

  @logger.info('All queue directories cleared successfully')
rescue => e
  @logger.error("Error clearing queue directories: #{e.message}")
  raise RuntimeError, "Failed to clear queues: #{e.message}"
end

#enqueue(submission) ⇒ Object

Enqueue a failed submission for retry

Parameters:

  • submission (Hash)

    The submission data



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/complyance_sdk/queue/persistent_queue_manager.rb', line 47

def enqueue(submission)
  payload_raw = submission[:payload] || submission['payload']
  queue_item_id = build_queue_item_id(
    nil,
    (submission[:country] || submission['country']).to_s,
    (submission[:document_type] || submission['document_type']).to_s,
    payload_raw
  )
  file_name = "#{queue_item_id}.json"
  file_path = File.join(@queue_base_path, PENDING_DIR, file_name)

  if exists_across_queues?(file_name)
    return
  end

  # Parse the UnifyRequest JSON string to proper JSON object
  json_payload = payload_raw

  # Verify the payload is not empty
  if json_payload.strip.empty? || json_payload == '{}'
    @logger.error("🔥 QUEUE: ERROR - Received empty or invalid payload: '#{json_payload}'")
    raise RuntimeError, 'Cannot enqueue empty payload'
  end

  # Parse the UnifyRequest JSON string to a proper JSON object
  begin
    unify_request_map = JSON.parse(json_payload)
  rescue JSON::ParserError => e
    @logger.error("🔥 QUEUE: Failed to parse JSON payload: #{e.message}")
    raise RuntimeError, "Invalid JSON payload: #{e.message}"
  end

  now = Time.now.utc.strftime('%Y-%m-%dT%H:%M:%SZ')
  record = {
    queueItemId: queue_item_id,
    requestId: unify_request_map['requestId'] || unify_request_map['request_id'],
    attemptCount: 0,
    firstEnqueuedAt: now,
    lastAttemptAt: nil,
    lastErrorCode: nil,
    lastHttpStatus: nil,
    nextRetryAt: now,
    operationName: 'push_to_unify',
    payload: unify_request_map,
    source_id: submission[:source][:id] || submission['source']['id'],
    country: (submission[:country] || submission['country']).to_s,
    document_type: (submission[:document_type] || submission['document_type']).to_s,
    enqueued_at: now,
    timestamp: Time.now.to_i
  }

  # Write to file
  File.write(file_path, JSON.pretty_generate(record))

  @logger.info("Enqueued submission to persistent storage: #{file_name} for source: #{record[:source_id]}, country: #{record[:country]}")

  # Start processing if not already running
  start_processing
rescue => e
  @logger.error("Failed to enqueue submission to persistent storage: #{e.message}")
  raise RuntimeError, "Failed to persist submission: #{e.message}"
end

#enqueue_for_retry(request_data, operation_name = 'push_to_unify', error_code: nil, http_status: nil) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/complyance_sdk/queue/persistent_queue_manager.rb', line 110

def enqueue_for_retry(request_data, operation_name = 'push_to_unify', error_code: nil, http_status: nil)
  request_hash = request_data.is_a?(Hash) ? request_data : {}
  request_id = request_hash[:requestId] || request_hash['requestId'] || request_hash[:request_id] || request_hash['request_id']
  country = request_hash[:country] || request_hash['country']
  payload = request_hash[:payload] || request_hash['payload'] || {}
  document_type = request_hash[:documentType] || request_hash['documentType']
  queue_item_id = build_queue_item_id(request_id, country, document_type, payload)
  file_name = "#{queue_item_id}.json"
  return if exists_across_queues?(file_name)

  now = Time.now.utc.strftime('%Y-%m-%dT%H:%M:%SZ')
  record = {
    queueItemId: queue_item_id,
    requestId: request_id,
    attemptCount: 0,
    firstEnqueuedAt: now,
    lastAttemptAt: nil,
    lastErrorCode: error_code,
    lastHttpStatus: http_status,
    nextRetryAt: now,
    operationName: operation_name,
    payload: request_hash,
    timestamp: Time.now.to_i
  }
  File.write(File.join(@queue_base_path, PENDING_DIR, file_name), JSON.pretty_generate(record))
rescue StandardError => e
  @logger.error("Failed to enqueue request for retry: #{e.message}")
end

#process_pending_submissions_nowObject

Manually trigger processing of pending submissions



171
172
173
174
# File 'lib/complyance_sdk/queue/persistent_queue_manager.rb', line 171

def process_pending_submissions_now
  @logger.info('Manually triggering processing of pending submissions')
  process_pending_submissions
end

#queue_statusHash

Get queue status and statistics

Returns:

  • (Hash)

    Queue status information



179
180
181
182
183
184
185
186
187
# File 'lib/complyance_sdk/queue/persistent_queue_manager.rb', line 179

def queue_status
  {
    pending_count: count_files_in_directory(PENDING_DIR),
    processing_count: count_files_in_directory(PROCESSING_DIR),
    failed_count: count_files_in_directory(FAILED_DIR),
    success_count: count_files_in_directory(SUCCESS_DIR),
    running: @running
  }
end

#retry_failed_submissionsObject

Retry failed submissions by moving them back to pending



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/complyance_sdk/queue/persistent_queue_manager.rb', line 190

def retry_failed_submissions
  failed_dir = File.join(@queue_base_path, FAILED_DIR)
  pending_dir = File.join(@queue_base_path, PENDING_DIR)
  
  failed_files = Dir.glob(File.join(failed_dir, '*.json'))

  if failed_files.empty?
    @logger.info('No failed submissions to retry')
    return
  end

  @logger.info("Retrying #{failed_files.length} failed submissions")

  failed_files.each do |file_path|
    file_name = File.basename(file_path)
    pending_path = File.join(pending_dir, file_name)

    if exists_across_queues?(file_name, exclude_dir: FAILED_DIR)
      File.delete(file_path) if File.exist?(file_path)
    else
      FileUtils.mv(file_path, pending_path)
      @logger.debug("Moved failed submission back to pending: #{file_name}")
    end
  end
rescue => e
  @logger.error("Failed to retry failed submissions: #{e.message}")
end

#start_processingObject

Start processing pending submissions



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/complyance_sdk/queue/persistent_queue_manager.rb', line 140

def start_processing
  return if @running

  @running = true
  
  # Start background thread for processing
  @processing_thread = Thread.new do
    loop do
      break unless @running
      
      begin
        process_pending_submissions
      rescue => e
        @logger.error("Error in processing thread: #{e.message}")
      end
      
      sleep(30) # Process every 30 seconds
    end
  end

  @logger.info('Started persistent queue processing')
end

#stop_processingObject

Stop processing pending submissions



164
165
166
167
168
# File 'lib/complyance_sdk/queue/persistent_queue_manager.rb', line 164

def stop_processing
  @running = false
  @processing_thread&.join(5) # Wait up to 5 seconds for thread to finish
  @logger.info('Stopped persistent queue processing')
end