Class: AllStak::Transport::FlushBuffer

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/allstak/transport/flush_buffer.rb

Overview

Bounded ring buffer with a background flush thread.

  • Max size: ‘maxsize` (default 500)

  • Eviction: oldest item dropped when full

  • Flush triggers: interval timer, >= 80% capacity, explicit flush, shutdown

  • Single-flight: only one flush runs at a time

Instance Method Summary collapse

Constructor Details

#initialize(name:, max_size:, interval_ms:, flush_proc:, logger:) ⇒ FlushBuffer

Returns a new instance of FlushBuffer.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/allstak/transport/flush_buffer.rb', line 14

def initialize(name:, max_size:, interval_ms:, flush_proc:, logger:)
  super()
  @name = name
  @max_size = max_size
  @interval = interval_ms / 1000.0
  @flush_proc = flush_proc
  @logger = logger
  @queue = []
  @stopped = false
  @overflow_warned = false
  @dropped_count = 0
  @flushing_mutex = Mutex.new
  start_timer
end

Instance Method Details

#countObject



46
47
48
# File 'lib/allstak/transport/flush_buffer.rb', line 46

def count
  synchronize { @queue.length }
end

#dropped_countObject



50
51
52
# File 'lib/allstak/transport/flush_buffer.rb', line 50

def dropped_count
  synchronize { @dropped_count }
end

#flushObject



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/allstak/transport/flush_buffer.rb', line 54

def flush
  @flushing_mutex.synchronize do
    drained = synchronize do
      next [] if @queue.empty?
      current = @queue
      @queue = []
      current
    end
    return if drained.empty?
    begin
      @flush_proc.call(drained)
    rescue => e
      @logger.debug("[AllStak] flush error in #{@name}: #{e.class}: #{e.message}")
    end
  end
end

#push(item) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/allstak/transport/flush_buffer.rb', line 29

def push(item)
  synchronize do
    if @queue.length >= @max_size
      @queue.shift
      @dropped_count += 1
      unless @overflow_warned
        @overflow_warned = true
        @logger.warn("[AllStak] Buffer #{@name} full (#{@max_size}); oldest events dropped")
      end
    else
      @overflow_warned = false
    end
    @queue << item
  end
  flush if count >= (@max_size * 0.8)
end

#shutdownObject



71
72
73
74
75
76
# File 'lib/allstak/transport/flush_buffer.rb', line 71

def shutdown
  @stopped = true
  @timer_thread&.wakeup rescue nil
  @timer_thread&.join(2)
  flush
end