Class: Pgbus::StatBuffer
- Inherits:
-
Object
- Object
- Pgbus::StatBuffer
- Defined in:
- lib/pgbus/stat_buffer.rb
Overview
Thread-safe buffer that accumulates job stats in memory and flushes them to the database in batches. This avoids one INSERT per job execution, replacing it with periodic bulk inserts.
Constant Summary collapse
- DEFAULT_FLUSH_SIZE =
100- DEFAULT_FLUSH_INTERVAL =
seconds
5
Instance Attribute Summary collapse
-
#flush_interval ⇒ Object
readonly
Returns the value of attribute flush_interval.
-
#flush_size ⇒ Object
readonly
Returns the value of attribute flush_size.
Instance Method Summary collapse
-
#flush ⇒ Object
Flush buffered stats to the database.
-
#flush_if_due ⇒ Object
Flush if the interval has elapsed since the last flush.
-
#initialize(flush_size: DEFAULT_FLUSH_SIZE, flush_interval: DEFAULT_FLUSH_INTERVAL) ⇒ StatBuffer
constructor
A new instance of StatBuffer.
-
#push(attrs) ⇒ Object
Append a stat entry to the buffer.
- #size ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(flush_size: DEFAULT_FLUSH_SIZE, flush_interval: DEFAULT_FLUSH_INTERVAL) ⇒ StatBuffer
Returns a new instance of StatBuffer.
13 14 15 16 17 18 19 20 |
# File 'lib/pgbus/stat_buffer.rb', line 13 def initialize(flush_size: DEFAULT_FLUSH_SIZE, flush_interval: DEFAULT_FLUSH_INTERVAL) @flush_size = flush_size @flush_interval = flush_interval @buffer = [] @mutex = Mutex.new @last_flush_at = monotonic_now @stopped = false end |
Instance Attribute Details
#flush_interval ⇒ Object (readonly)
Returns the value of attribute flush_interval.
11 12 13 |
# File 'lib/pgbus/stat_buffer.rb', line 11 def flush_interval @flush_interval end |
#flush_size ⇒ Object (readonly)
Returns the value of attribute flush_size.
11 12 13 |
# File 'lib/pgbus/stat_buffer.rb', line 11 def flush_size @flush_size end |
Instance Method Details
#flush ⇒ Object
Flush buffered stats to the database. Safe to call from any thread.
36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/pgbus/stat_buffer.rb', line 36 def flush entries = nil @mutex.synchronize do return if @buffer.empty? entries = @buffer.dup @buffer.clear @last_flush_at = monotonic_now end write_to_database(entries) if entries&.any? end |
#flush_if_due ⇒ Object
Flush if the interval has elapsed since the last flush. Called by the dispatcher on its maintenance tick.
52 53 54 55 |
# File 'lib/pgbus/stat_buffer.rb', line 52 def flush_if_due due = @mutex.synchronize { monotonic_now - @last_flush_at >= @flush_interval } flush if due end |
#push(attrs) ⇒ Object
Append a stat entry to the buffer. Flushes automatically when the buffer reaches flush_size.
24 25 26 27 28 29 30 31 32 33 |
# File 'lib/pgbus/stat_buffer.rb', line 24 def push(attrs) should_flush = false @mutex.synchronize do @buffer << attrs should_flush = @buffer.size >= @flush_size end flush if should_flush end |
#size ⇒ Object
62 63 64 |
# File 'lib/pgbus/stat_buffer.rb', line 62 def size @mutex.synchronize { @buffer.size } end |
#stop ⇒ Object
57 58 59 60 |
# File 'lib/pgbus/stat_buffer.rb', line 57 def stop @stopped = true flush end |