Class: Smplkit::Audit::EventBuffer Private
- Inherits:
-
Object
- Object
- Smplkit::Audit::EventBuffer
- Defined in:
- lib/smplkit/audit/buffer.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Bounded in-memory queue + worker thread for fire-and-forget audit emits.
#enqueue returns immediately. The worker drains on either a periodic tick or once depth crosses the high-water mark, retries transient failures with exponential backoff, drops permanent 4xx (other than 429), and evicts the oldest item under sustained back-pressure.
Constant Summary collapse
- MAX_BUFFER_SIZE =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
1000- WATERMARK =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
50- FLUSH_INTERVAL =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
5.0- MAX_ATTEMPTS =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
5- INITIAL_BACKOFF =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
0.25- MAX_BACKOFF =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
8.0
Instance Method Summary collapse
- #close(timeout: 5.0) ⇒ Object private
- #enqueue(body, idempotency_key) ⇒ Object private
- #flush(timeout: 5.0) ⇒ Object private
-
#initialize(api) ⇒ EventBuffer
constructor
private
A new instance of EventBuffer.
Constructor Details
#initialize(api) ⇒ EventBuffer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of EventBuffer.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/smplkit/audit/buffer.rb', line 23 def initialize(api) @api = api @queue = [] @mutex = Mutex.new @cond = ConditionVariable.new @closed = false @dropped_count = 0 # +@in_flight+ is the number of items the worker has shifted off # the queue but not yet finished POSTing. +#flush+ must wait on # both queue empty AND in_flight == 0 — otherwise it can return # while a just-shifted item is still in the middle of its HTTP # round-trip, and an immediately following +list+ call would # miss the event. @in_flight = 0 @worker = Thread.new { run } @worker.report_on_exception = false end |
Instance Method Details
#close(timeout: 5.0) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
73 74 75 76 77 78 79 80 |
# File 'lib/smplkit/audit/buffer.rb', line 73 def close(timeout: 5.0) flush(timeout: timeout) @mutex.synchronize do @closed = true @cond.broadcast end @worker.join(timeout) end |
#enqueue(body, idempotency_key) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/smplkit/audit/buffer.rb', line 41 def enqueue(body, idempotency_key) depth = nil @mutex.synchronize do return if @closed if @queue.size >= MAX_BUFFER_SIZE @queue.shift @dropped_count += 1 warn "[smplkit.audit] buffer full (size=#{MAX_BUFFER_SIZE}); " \ "dropped oldest event (total dropped=#{@dropped_count})" end @queue.push(PendingEvent.new(body, idempotency_key)) depth = @queue.size @cond.signal if depth >= WATERMARK end end |
#flush(timeout: 5.0) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/smplkit/audit/buffer.rb', line 58 def flush(timeout: 5.0) deadline = monotonic_now + timeout loop do idle = @mutex.synchronize { @queue.empty? && @in_flight.zero? } return if idle if monotonic_now >= deadline warn "[smplkit.audit] flush timed out (timeout=#{timeout}s)" return end @mutex.synchronize { @cond.signal } sleep 0.05 end end |