Class: AllStak::Transport::FlushBuffer
- Inherits:
-
Object
- Object
- AllStak::Transport::FlushBuffer
- Includes:
- MonitorMixin
- Defined in:
- lib/allstak/transport/flush_buffer.rb
Overview
Bounded ring buffer with a background flush thread.
-
Max size: ‘maxsize` (default 500)
-
Eviction: oldest item dropped when full
-
Flush triggers: interval timer, >= 80% capacity, explicit flush, shutdown
-
Single-flight: only one flush runs at a time
Instance Method Summary collapse
- #count ⇒ Object
- #flush ⇒ Object
-
#initialize(name:, max_size:, interval_ms:, flush_proc:, logger:) ⇒ FlushBuffer
constructor
A new instance of FlushBuffer.
- #push(item) ⇒ Object
- #shutdown ⇒ Object
Constructor Details
#initialize(name:, max_size:, interval_ms:, flush_proc:, logger:) ⇒ FlushBuffer
Returns a new instance of FlushBuffer.
14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/allstak/transport/flush_buffer.rb', line 14 def initialize(name:, max_size:, interval_ms:, flush_proc:, logger:) super() @name = name @max_size = max_size @interval = interval_ms / 1000.0 @flush_proc = flush_proc @logger = logger @queue = [] @stopped = false @overflow_warned = false @flushing_mutex = Mutex.new start_timer end |
Instance Method Details
#count ⇒ Object
44 45 46 |
# File 'lib/allstak/transport/flush_buffer.rb', line 44 def count synchronize { @queue.length } end |
#flush ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/allstak/transport/flush_buffer.rb', line 48 def flush @flushing_mutex.synchronize do drained = synchronize do next [] if @queue.empty? current = @queue @queue = [] current end return if drained.empty? begin @flush_proc.call(drained) rescue => e @logger.debug("[AllStak] flush error in #{@name}: #{e.class}: #{e.}") end end end |
#push(item) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/allstak/transport/flush_buffer.rb', line 28 def push(item) synchronize do if @queue.length >= @max_size @queue.shift unless @overflow_warned @overflow_warned = true @logger.warn("[AllStak] Buffer #{@name} full (#{@max_size}); oldest events dropped") end else @overflow_warned = false end @queue << item end flush if count >= (@max_size * 0.8) end |
#shutdown ⇒ Object
65 66 67 68 69 70 |
# File 'lib/allstak/transport/flush_buffer.rb', line 65 def shutdown @stopped = true @timer_thread&.wakeup rescue nil @timer_thread&.join(2) flush end |