Class: ABMeter::AsyncSubmitter
- Inherits:
-
Object
- Object
- ABMeter::AsyncSubmitter
- Defined in:
- lib/abmeter/async_submitter.rb
Constant Summary collapse
- BATCH_SIZE =
Private internal constants for async submitter behavior
100- MAX_SUBMIT_ATTEMPTS =
3- MAX_RETRY_QUEUE_SIZE =
1000
Class Attribute Summary collapse
-
.api_client ⇒ Object
readonly
Returns the value of attribute api_client.
-
.flush_interval ⇒ Object
readonly
Returns the value of attribute flush_interval.
-
.logger ⇒ Object
readonly
Returns the value of attribute logger.
-
.retry_queue ⇒ Object
readonly
Returns the value of attribute retry_queue.
Class Method Summary collapse
- .configure(api_client:, config:) ⇒ Object
- .flush ⇒ Object
- .queue_event(event_slug, user_id, custom_fields) ⇒ Object
- .queue_exposure(exposure) ⇒ Object
- .queue_size ⇒ Object
- .reset! ⇒ Object
- .shutdown ⇒ Object
- .start ⇒ Object
- .worker_alive? ⇒ Boolean
Class Attribute Details
.api_client ⇒ Object (readonly)
Returns the value of attribute api_client.
17 18 19 |
# File 'lib/abmeter/async_submitter.rb', line 17 def api_client @api_client end |
.flush_interval ⇒ Object (readonly)
Returns the value of attribute flush_interval.
17 18 19 |
# File 'lib/abmeter/async_submitter.rb', line 17 def flush_interval @flush_interval end |
.logger ⇒ Object (readonly)
Returns the value of attribute logger.
17 18 19 |
# File 'lib/abmeter/async_submitter.rb', line 17 def logger @logger end |
.retry_queue ⇒ Object (readonly)
Returns the value of attribute retry_queue.
17 18 19 |
# File 'lib/abmeter/async_submitter.rb', line 17 def retry_queue @retry_queue end |
Class Method Details
.configure(api_client:, config:) ⇒ Object
19 20 21 22 23 |
# File 'lib/abmeter/async_submitter.rb', line 19 def configure(api_client:, config:) @api_client = api_client @flush_interval = config.flush_interval || DEFAULT_FLUSH_INTERVAL @logger = config.logger end |
.flush ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/abmeter/async_submitter.rb', line 45 def flush items = [] @mutex.synchronize do # First, try to process retry queue process_retry_queue # Then process new items items << @queue.pop while !@queue.empty? && items.size < BATCH_SIZE end # Group by type and submit unless items.empty? exposures = items.select { |i| i[:type] == :exposure }.map { |i| i[:data] } events = items.select { |i| i[:type] == :event }.map { |i| i[:data] } submit_exposures(exposures) unless exposures.empty? submit_events(events) unless events.empty? end end |
.queue_event(event_slug, user_id, custom_fields) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/abmeter/async_submitter.rb', line 33 def queue_event(event_slug, user_id, custom_fields) @queue.push({ type: :event, data: { event_slug: event_slug, user_id: user_id, occurred_at: Time.now.iso8601, custom_fields: custom_fields } }) end |
.queue_exposure(exposure) ⇒ Object
29 30 31 |
# File 'lib/abmeter/async_submitter.rb', line 29 def queue_exposure(exposure) @queue.push({ type: :exposure, data: exposure }) end |
.queue_size ⇒ Object
86 87 88 |
# File 'lib/abmeter/async_submitter.rb', line 86 def queue_size @queue.size end |
.reset! ⇒ Object
72 73 74 75 76 77 78 79 80 |
# File 'lib/abmeter/async_submitter.rb', line 72 def reset! shutdown @queue = Queue.new @retry_queue = [] @api_client = nil @worker_thread = nil @flush_interval = DEFAULT_FLUSH_INTERVAL @logger = nil end |
.shutdown ⇒ Object
66 67 68 69 70 |
# File 'lib/abmeter/async_submitter.rb', line 66 def shutdown @worker_thread&.kill # Flush all remaining exposures flush until @queue.empty? end |
.start ⇒ Object
25 26 27 |
# File 'lib/abmeter/async_submitter.rb', line 25 def start start_worker end |
.worker_alive? ⇒ Boolean
82 83 84 |
# File 'lib/abmeter/async_submitter.rb', line 82 def worker_alive? @worker_thread&.alive? || false end |