Class: SolidObserver::QueueEventBuffer

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/solid_observer/queue_event_buffer.rb

Overview

Thread-safe buffer for collecting queue events before batch insertion.

Events are buffered in memory and flushed either when:

  • Buffer size reaches the configured threshold

  • Flush interval timer expires

Examples:

Push an event to the buffer

QueueEventBuffer.instance.push(event_data)

Constant Summary collapse

INITIAL_METRICS =
{
  flush_failures_count: 0,
  drops_count: 0,
  last_flush_at: nil,
  last_flush_duration_ms: nil,
  last_flush_error: nil
}.freeze

Instance Method Summary collapse

Constructor Details

#initializeQueueEventBuffer

Returns a new instance of QueueEventBuffer.



26
27
28
29
30
31
32
# File 'lib/solid_observer/queue_event_buffer.rb', line 26

def initialize
  @mutex = Mutex.new
  @metrics_mutex = Mutex.new
  @buffer = []
  @metrics = INITIAL_METRICS.dup
  @timer_task = nil
end

Instance Method Details

#clearObject



75
76
77
# File 'lib/solid_observer/queue_event_buffer.rb', line 75

def clear
  @mutex.synchronize { @buffer.clear }
end

#flush!void

This method returns an undefined value.

Flushes all buffered events to the database.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/solid_observer/queue_event_buffer.rb', line 50

def flush!
  events_to_flush = nil

  @mutex.synchronize do
    return if @buffer.empty?
    events_to_flush = @buffer.dup
    @buffer.clear
  end

  started_at_ms = monotonic_ms
  begin
    Services::FlushEventBuffer.call(events_to_flush)
  rescue => e
    requeue_failed_events(events_to_flush)
    record_flush_failure(e)
    Rails.logger&.error "[SolidObserver] Buffer flush failed: #{e.message}" if defined?(Rails)
    return
  end
  record_flush_success(monotonic_ms - started_at_ms)
end

#metricsObject



79
80
81
82
83
84
85
86
# File 'lib/solid_observer/queue_event_buffer.rb', line 79

def metrics
  current_size = @mutex.synchronize { @buffer.size }
  snapshot = @metrics_mutex.synchronize { @metrics.dup }
  {
    size: current_size,
    max_buffer_size: SolidObserver.config.max_buffer_size
  }.merge(snapshot)
end

#push(event_data) ⇒ void

This method returns an undefined value.

Adds an event to the buffer and triggers flush if threshold reached.

Parameters:

  • event_data (Hash)

    Event data to buffer



38
39
40
41
42
43
44
45
# File 'lib/solid_observer/queue_event_buffer.rb', line 38

def push(event_data)
  return unless (config = SolidObserver.config).persistence_mode?

  drops_count, should_flush = sync_push_and_check(event_data, config)
  record_drop(drops_count) if drops_count.positive?
  ensure_timer_running
  flush! if should_flush
end

#shutdownObject



88
89
90
91
# File 'lib/solid_observer/queue_event_buffer.rb', line 88

def shutdown
  stop_timer
  flush!
end

#sizeObject



71
72
73
# File 'lib/solid_observer/queue_event_buffer.rb', line 71

def size
  @mutex.synchronize { @buffer.size }
end