Class: Pgbus::StatBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/stat_buffer.rb

Overview

Thread-safe buffer that accumulates job stats in memory and flushes them to the database in batches. This avoids one INSERT per job execution, replacing it with periodic bulk inserts.

Constant Summary collapse

DEFAULT_FLUSH_SIZE =
100
DEFAULT_FLUSH_INTERVAL =

seconds

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(flush_size: DEFAULT_FLUSH_SIZE, flush_interval: DEFAULT_FLUSH_INTERVAL) ⇒ StatBuffer

Returns a new instance of StatBuffer.



13
14
15
16
17
18
19
20
# File 'lib/pgbus/stat_buffer.rb', line 13

def initialize(flush_size: DEFAULT_FLUSH_SIZE, flush_interval: DEFAULT_FLUSH_INTERVAL)
  @flush_size = flush_size
  @flush_interval = flush_interval
  @buffer = []
  @mutex = Mutex.new
  @last_flush_at = monotonic_now
  @stopped = false
end

Instance Attribute Details

#flush_intervalObject (readonly)

Returns the value of attribute flush_interval.



11
12
13
# File 'lib/pgbus/stat_buffer.rb', line 11

def flush_interval
  @flush_interval
end

#flush_sizeObject (readonly)

Returns the value of attribute flush_size.



11
12
13
# File 'lib/pgbus/stat_buffer.rb', line 11

def flush_size
  @flush_size
end

Instance Method Details

#flushObject

Flush buffered stats to the database. Safe to call from any thread.



36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/pgbus/stat_buffer.rb', line 36

def flush
  entries = nil

  @mutex.synchronize do
    return if @buffer.empty?

    entries = @buffer.dup
    @buffer.clear
    @last_flush_at = monotonic_now
  end

  write_to_database(entries) if entries&.any?
end

#flush_if_dueObject

Flush if the interval has elapsed since the last flush. Called by the dispatcher on its maintenance tick.



52
53
54
55
# File 'lib/pgbus/stat_buffer.rb', line 52

def flush_if_due
  due = @mutex.synchronize { monotonic_now - @last_flush_at >= @flush_interval }
  flush if due
end

#push(attrs) ⇒ Object

Append a stat entry to the buffer. Flushes automatically when the buffer reaches flush_size.



24
25
26
27
28
29
30
31
32
33
# File 'lib/pgbus/stat_buffer.rb', line 24

def push(attrs)
  should_flush = false

  @mutex.synchronize do
    @buffer << attrs
    should_flush = @buffer.size >= @flush_size
  end

  flush if should_flush
end

#sizeObject



62
63
64
# File 'lib/pgbus/stat_buffer.rb', line 62

def size
  @mutex.synchronize { @buffer.size }
end

#stopObject



57
58
59
60
# File 'lib/pgbus/stat_buffer.rb', line 57

def stop
  @stopped = true
  flush
end