Class: Smplkit::Audit::EventBuffer Private

Inherits:
Object
  • Object
show all
Defined in:
lib/smplkit/audit/buffer.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Bounded in-memory queue + worker thread for fire-and-forget audit emits.

#enqueue returns immediately. The worker drains on either a periodic tick or once depth crosses the high-water mark, retries transient failures with exponential backoff, drops permanent 4xx (other than 429), and evicts the oldest item under sustained back-pressure.

Constant Summary collapse

MAX_BUFFER_SIZE =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

1000
WATERMARK =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

50
FLUSH_INTERVAL =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

5.0
MAX_ATTEMPTS =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

5
INITIAL_BACKOFF =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

0.25
MAX_BACKOFF =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

8.0

Instance Method Summary collapse

Constructor Details

#initialize(api) ⇒ EventBuffer

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of EventBuffer.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/smplkit/audit/buffer.rb', line 23

def initialize(api)
  @api = api
  @queue = []
  @mutex = Mutex.new
  @cond = ConditionVariable.new
  @closed = false
  @dropped_count = 0
  # +@in_flight+ is the number of items the worker has shifted off
  # the queue but not yet finished POSTing. +#flush+ must wait on
  # both queue empty AND in_flight == 0 — otherwise it can return
  # while a just-shifted item is still in the middle of its HTTP
  # round-trip, and an immediately following +list+ call would
  # miss the event.
  @in_flight = 0
  @worker = Thread.new { run }
  @worker.report_on_exception = false
end

Instance Method Details

#close(timeout: 5.0) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



73
74
75
76
77
78
79
80
# File 'lib/smplkit/audit/buffer.rb', line 73

def close(timeout: 5.0)
  flush(timeout: timeout)
  @mutex.synchronize do
    @closed = true
    @cond.broadcast
  end
  @worker.join(timeout)
end

#enqueue(body, idempotency_key) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/smplkit/audit/buffer.rb', line 41

def enqueue(body, idempotency_key)
  depth = nil
  @mutex.synchronize do
    return if @closed

    if @queue.size >= MAX_BUFFER_SIZE
      @queue.shift
      @dropped_count += 1
      warn "[smplkit.audit] buffer full (size=#{MAX_BUFFER_SIZE}); " \
           "dropped oldest event (total dropped=#{@dropped_count})"
    end
    @queue.push(PendingEvent.new(body, idempotency_key))
    depth = @queue.size
    @cond.signal if depth >= WATERMARK
  end
end

#flush(timeout: 5.0) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/smplkit/audit/buffer.rb', line 58

def flush(timeout: 5.0)
  deadline = monotonic_now + timeout
  loop do
    idle = @mutex.synchronize { @queue.empty? && @in_flight.zero? }
    return if idle

    if monotonic_now >= deadline
      warn "[smplkit.audit] flush timed out (timeout=#{timeout}s)"
      return
    end
    @mutex.synchronize { @cond.signal }
    sleep 0.05
  end
end