Class: SolidObserver::QueueEventBuffer
- Inherits:
-
Object
- Object
- SolidObserver::QueueEventBuffer
- Includes:
- Singleton
- Defined in:
- lib/solid_observer/queue_event_buffer.rb
Overview
Thread-safe buffer for collecting queue events before batch insertion.
Events are buffered in memory and flushed either when:
-
Buffer size reaches the configured threshold
-
Flush interval timer expires
Constant Summary collapse
- INITIAL_METRICS =
{ flush_failures_count: 0, drops_count: 0, last_flush_at: nil, last_flush_duration_ms: nil, last_flush_error: nil }.freeze
Instance Method Summary collapse
- #clear ⇒ Object
-
#flush! ⇒ void
Flushes all buffered events to the database.
-
#initialize ⇒ QueueEventBuffer
constructor
A new instance of QueueEventBuffer.
- #metrics ⇒ Object
-
#push(event_data) ⇒ void
Adds an event to the buffer and triggers flush if threshold reached.
- #shutdown ⇒ Object
- #size ⇒ Object
Constructor Details
#initialize ⇒ QueueEventBuffer
Returns a new instance of QueueEventBuffer.
26 27 28 29 30 31 32 |
# File 'lib/solid_observer/queue_event_buffer.rb', line 26 def initialize @mutex = Mutex.new @metrics_mutex = Mutex.new @buffer = [] @metrics = INITIAL_METRICS.dup @timer_task = nil end |
Instance Method Details
#clear ⇒ Object
75 76 77 |
# File 'lib/solid_observer/queue_event_buffer.rb', line 75 def clear @mutex.synchronize { @buffer.clear } end |
#flush! ⇒ void
This method returns an undefined value.
Flushes all buffered events to the database.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/solid_observer/queue_event_buffer.rb', line 50 def flush! events_to_flush = nil @mutex.synchronize do return if @buffer.empty? events_to_flush = @buffer.dup @buffer.clear end started_at_ms = monotonic_ms begin Services::FlushEventBuffer.call(events_to_flush) rescue => e requeue_failed_events(events_to_flush) record_flush_failure(e) Rails.logger&.error "[SolidObserver] Buffer flush failed: #{e.}" if defined?(Rails) return end record_flush_success(monotonic_ms - started_at_ms) end |
#metrics ⇒ Object
79 80 81 82 83 84 85 86 |
# File 'lib/solid_observer/queue_event_buffer.rb', line 79 def metrics current_size = @mutex.synchronize { @buffer.size } snapshot = @metrics_mutex.synchronize { @metrics.dup } { size: current_size, max_buffer_size: SolidObserver.config.max_buffer_size }.merge(snapshot) end |
#push(event_data) ⇒ void
This method returns an undefined value.
Adds an event to the buffer and triggers flush if threshold reached.
38 39 40 41 42 43 44 45 |
# File 'lib/solid_observer/queue_event_buffer.rb', line 38 def push(event_data) return unless (config = SolidObserver.config).persistence_mode? drops_count, should_flush = sync_push_and_check(event_data, config) record_drop(drops_count) if drops_count.positive? ensure_timer_running flush! if should_flush end |
#shutdown ⇒ Object
88 89 90 91 |
# File 'lib/solid_observer/queue_event_buffer.rb', line 88 def shutdown stop_timer flush! end |
#size ⇒ Object
71 72 73 |
# File 'lib/solid_observer/queue_event_buffer.rb', line 71 def size @mutex.synchronize { @buffer.size } end |