Class: Beacon::Queue
- Inherits:
-
Object
- Object
- Beacon::Queue
- Defined in:
- lib/beacon/queue.rb
Overview
Bounded in-process event queue with oldest-drop semantics.
SizedQueue from stdlib is unsuitable here: it blocks producers when full, which would mean Beacon stalls the host’s request cycle during a Beacon outage. We need the opposite — drop the oldest event and let the host keep serving.
Card 8: the queue also carries the wake-up signal for the flusher. When ‘length` crosses `flush_threshold` on push, we signal the shared ConditionVariable so the flusher can wake early instead of waiting out the full flush_interval. This turns a 1 Hz polling flusher into an event-driven one for bursty workloads while keeping the periodic tick as the floor for low-traffic apps.
Instance Attribute Summary collapse
-
#dropped ⇒ Object
readonly
Returns the value of attribute dropped.
Instance Method Summary collapse
- #drain(limit) ⇒ Object
- #empty? ⇒ Boolean
-
#initialize(max:, flush_threshold: nil) ⇒ Queue
constructor
A new instance of Queue.
- #length ⇒ Object (also: #size)
- #push(event) ⇒ Object (also: #<<)
-
#signal_all ⇒ Object
Explicit wake-up for the flusher’s stop/shutdown path.
-
#wait_and_drain(limit, timeout) ⇒ Object
Block until an event arrives OR ‘timeout` seconds elapse, then drain up to `limit` events.
Constructor Details
#initialize(max:, flush_threshold: nil) ⇒ Queue
Returns a new instance of Queue.
18 19 20 21 22 23 24 25 |
# File 'lib/beacon/queue.rb', line 18 def initialize(max:, flush_threshold: nil) @max = max @flush_threshold = flush_threshold @mutex = Mutex.new @not_empty = ConditionVariable.new @items = [] @dropped = 0 end |
Instance Attribute Details
#dropped ⇒ Object (readonly)
Returns the value of attribute dropped.
16 17 18 |
# File 'lib/beacon/queue.rb', line 16 def dropped @dropped end |
Instance Method Details
#drain(limit) ⇒ Object
42 43 44 |
# File 'lib/beacon/queue.rb', line 42 def drain(limit) @mutex.synchronize { @items.shift(limit) } end |
#empty? ⇒ Boolean
67 68 69 |
# File 'lib/beacon/queue.rb', line 67 def empty? length.zero? end |
#length ⇒ Object Also known as: size
62 63 64 |
# File 'lib/beacon/queue.rb', line 62 def length @mutex.synchronize { @items.length } end |
#push(event) ⇒ Object Also known as: <<
27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/beacon/queue.rb', line 27 def push(event) @mutex.synchronize do if @items.length >= @max @items.shift @dropped += 1 end @items << event if @flush_threshold && @items.length >= @flush_threshold @not_empty.signal end end nil end |
#signal_all ⇒ Object
Explicit wake-up for the flusher’s stop/shutdown path.
58 59 60 |
# File 'lib/beacon/queue.rb', line 58 def signal_all @mutex.synchronize { @not_empty.broadcast } end |
#wait_and_drain(limit, timeout) ⇒ Object
Block until an event arrives OR ‘timeout` seconds elapse, then drain up to `limit` events. Used by the flusher’s run_loop to wait for either a size-triggered wake-up from Queue#push or the periodic flush_interval floor, whichever comes first.
50 51 52 53 54 55 |
# File 'lib/beacon/queue.rb', line 50 def wait_and_drain(limit, timeout) @mutex.synchronize do @not_empty.wait(@mutex, timeout) if @items.empty? @items.shift(limit) end end |