Class: ABMeter::AsyncSubmitter

Inherits:
Object
  • Object
show all
Defined in:
lib/abmeter/async_submitter.rb

Constant Summary collapse

BATCH_SIZE =

Private internal constants for async submitter behavior

100
MAX_SUBMIT_ATTEMPTS =
3
MAX_RETRY_QUEUE_SIZE =
1000

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.api_clientObject (readonly)

Returns the value of attribute api_client.



17
18
19
# File 'lib/abmeter/async_submitter.rb', line 17

def api_client
  @api_client
end

.flush_intervalObject (readonly)

Returns the value of attribute flush_interval.



17
18
19
# File 'lib/abmeter/async_submitter.rb', line 17

def flush_interval
  @flush_interval
end

.loggerObject (readonly)

Returns the value of attribute logger.



17
18
19
# File 'lib/abmeter/async_submitter.rb', line 17

def logger
  @logger
end

.retry_queueObject (readonly)

Returns the value of attribute retry_queue.



17
18
19
# File 'lib/abmeter/async_submitter.rb', line 17

def retry_queue
  @retry_queue
end

Class Method Details

.configure(api_client:, config:) ⇒ Object



19
20
21
22
23
# File 'lib/abmeter/async_submitter.rb', line 19

def configure(api_client:, config:)
  @api_client = api_client
  @flush_interval = config.flush_interval || DEFAULT_FLUSH_INTERVAL
  @logger = config.logger
end

.flushObject



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/abmeter/async_submitter.rb', line 45

def flush
  items = []

  @mutex.synchronize do
    # First, try to process retry queue
    process_retry_queue

    # Then process new items
    items << @queue.pop while !@queue.empty? && items.size < BATCH_SIZE
  end

  # Group by type and submit
  unless items.empty?
    exposures = items.select { |i| i[:type] == :exposure }.map { |i| i[:data] }
    events = items.select { |i| i[:type] == :event }.map { |i| i[:data] }

    submit_exposures(exposures) unless exposures.empty?
    submit_events(events) unless events.empty?
  end
end

.queue_event(event_slug, user_id, custom_fields) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
# File 'lib/abmeter/async_submitter.rb', line 33

def queue_event(event_slug, user_id, custom_fields)
  @queue.push({
                type: :event,
                data: {
                  event_slug: event_slug,
                  user_id: user_id,
                  occurred_at: Time.now.iso8601,
                  custom_fields: custom_fields
                }
              })
end

.queue_exposure(exposure) ⇒ Object



29
30
31
# File 'lib/abmeter/async_submitter.rb', line 29

def queue_exposure(exposure)
  @queue.push({ type: :exposure, data: exposure })
end

.queue_sizeObject



86
87
88
# File 'lib/abmeter/async_submitter.rb', line 86

def queue_size
  @queue.size
end

.reset!Object



72
73
74
75
76
77
78
79
80
# File 'lib/abmeter/async_submitter.rb', line 72

def reset!
  shutdown
  @queue = Queue.new
  @retry_queue = []
  @api_client = nil
  @worker_thread = nil
  @flush_interval = DEFAULT_FLUSH_INTERVAL
  @logger = nil
end

.shutdownObject



66
67
68
69
70
# File 'lib/abmeter/async_submitter.rb', line 66

def shutdown
  @worker_thread&.kill
  # Flush all remaining exposures
  flush until @queue.empty?
end

.startObject



25
26
27
# File 'lib/abmeter/async_submitter.rb', line 25

def start
  start_worker
end

.worker_alive?Boolean

Returns:

  • (Boolean)


82
83
84
# File 'lib/abmeter/async_submitter.rb', line 82

def worker_alive?
  @worker_thread&.alive? || false
end