Class: AllStak::Transport::FlushBuffer

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/allstak/transport/flush_buffer.rb

Overview

Bounded ring buffer with a background flush thread.

  • Max size: ‘maxsize` (default 500)

  • Eviction: oldest item dropped when full

  • Flush triggers: interval timer, >= 80% capacity, explicit flush, shutdown

  • Single-flight: only one flush runs at a time

Instance Method Summary collapse

Constructor Details

#initialize(name:, max_size:, interval_ms:, flush_proc:, logger:) ⇒ FlushBuffer

Returns a new instance of FlushBuffer.



14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/allstak/transport/flush_buffer.rb', line 14

def initialize(name:, max_size:, interval_ms:, flush_proc:, logger:)
  super()
  @name = name
  @max_size = max_size
  @interval = interval_ms / 1000.0
  @flush_proc = flush_proc
  @logger = logger
  @queue = []
  @stopped = false
  @overflow_warned = false
  @flushing_mutex = Mutex.new
  start_timer
end

Instance Method Details

#countObject



44
45
46
# File 'lib/allstak/transport/flush_buffer.rb', line 44

def count
  synchronize { @queue.length }
end

#flushObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/allstak/transport/flush_buffer.rb', line 48

def flush
  @flushing_mutex.synchronize do
    drained = synchronize do
      next [] if @queue.empty?
      current = @queue
      @queue = []
      current
    end
    return if drained.empty?
    begin
      @flush_proc.call(drained)
    rescue => e
      @logger.debug("[AllStak] flush error in #{@name}: #{e.class}: #{e.message}")
    end
  end
end

#push(item) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/allstak/transport/flush_buffer.rb', line 28

def push(item)
  synchronize do
    if @queue.length >= @max_size
      @queue.shift
      unless @overflow_warned
        @overflow_warned = true
        @logger.warn("[AllStak] Buffer #{@name} full (#{@max_size}); oldest events dropped")
      end
    else
      @overflow_warned = false
    end
    @queue << item
  end
  flush if count >= (@max_size * 0.8)
end

#shutdownObject



65
66
67
68
69
70
# File 'lib/allstak/transport/flush_buffer.rb', line 65

def shutdown
  @stopped = true
  @timer_thread&.wakeup rescue nil
  @timer_thread&.join(2)
  flush
end