Class: Beacon::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/beacon/queue.rb

Overview

Bounded in-process event queue with oldest-drop semantics.

SizedQueue from stdlib is unsuitable here: it blocks producers when full, which would mean Beacon stalls the host’s request cycle during a Beacon outage. We need the opposite — drop the oldest event and let the host keep serving.

Card 8: the queue also carries the wake-up signal for the flusher. When ‘length` crosses `flush_threshold` on push, we signal the shared ConditionVariable so the flusher can wake early instead of waiting out the full flush_interval. This turns a 1 Hz polling flusher into an event-driven one for bursty workloads while keeping the periodic tick as the floor for low-traffic apps.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max:, flush_threshold: nil) ⇒ Queue

Returns a new instance of Queue.



18
19
20
21
22
23
24
25
# File 'lib/beacon/queue.rb', line 18

def initialize(max:, flush_threshold: nil)
  @max             = max
  @flush_threshold = flush_threshold
  @mutex           = Mutex.new
  @not_empty       = ConditionVariable.new
  @items           = []
  @dropped         = 0
end

Instance Attribute Details

#droppedObject (readonly)

Returns the value of attribute dropped.



16
17
18
# File 'lib/beacon/queue.rb', line 16

def dropped
  @dropped
end

Instance Method Details

#drain(limit) ⇒ Object



42
43
44
# File 'lib/beacon/queue.rb', line 42

def drain(limit)
  @mutex.synchronize { @items.shift(limit) }
end

#empty?Boolean

Returns:

  • (Boolean)


67
68
69
# File 'lib/beacon/queue.rb', line 67

def empty?
  length.zero?
end

#lengthObject Also known as: size



62
63
64
# File 'lib/beacon/queue.rb', line 62

def length
  @mutex.synchronize { @items.length }
end

#push(event) ⇒ Object Also known as: <<



27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/beacon/queue.rb', line 27

def push(event)
  @mutex.synchronize do
    if @items.length >= @max
      @items.shift
      @dropped += 1
    end
    @items << event
    if @flush_threshold && @items.length >= @flush_threshold
      @not_empty.signal
    end
  end
  nil
end

#signal_allObject

Explicit wake-up for the flusher’s stop/shutdown path.



58
59
60
# File 'lib/beacon/queue.rb', line 58

def signal_all
  @mutex.synchronize { @not_empty.broadcast }
end

#wait_and_drain(limit, timeout) ⇒ Object

Block until an event arrives OR ‘timeout` seconds elapse, then drain up to `limit` events. Used by the flusher’s run_loop to wait for either a size-triggered wake-up from Queue#push or the periodic flush_interval floor, whichever comes first.



50
51
52
53
54
55
# File 'lib/beacon/queue.rb', line 50

def wait_and_drain(limit, timeout)
  @mutex.synchronize do
    @not_empty.wait(@mutex, timeout) if @items.empty?
    @items.shift(limit)
  end
end