Class: Igniter::Store::ChangefeedBuffer::SubscriberQueue
- Inherits:
-
Object
- Object
- Igniter::Store::ChangefeedBuffer::SubscriberQueue
- Defined in:
- lib/igniter/store/changefeed_buffer.rb
Overview
Bounded FIFO queue for one subscriber’s async delivery pipeline.
push is non-blocking and returns true when an overflow drop occurs. pop blocks until an event is available or the queue is closed. Once closed, pop drains remaining items (unless discard was requested) then returns nil.
Instance Method Summary collapse
-
#close(discard: false) ⇒ Object
Pass discard: true to clear queued events before signaling close.
-
#initialize(max_size:, overflow: :drop_oldest) ⇒ SubscriberQueue
constructor
A new instance of SubscriberQueue.
-
#pop ⇒ Object
Blocks until next event or close signal.
-
#push(event) ⇒ Object
Returns
trueif an overflow drop occurred,falseotherwise. - #size ⇒ Object
Constructor Details
#initialize(max_size:, overflow: :drop_oldest) ⇒ SubscriberQueue
Returns a new instance of SubscriberQueue.
77 78 79 80 81 82 83 84 |
# File 'lib/igniter/store/changefeed_buffer.rb', line 77 def initialize(max_size:, overflow: :drop_oldest) @max_size = max_size @overflow = overflow @items = [] @mu = Mutex.new @cond = ConditionVariable.new @closed = false end |
Instance Method Details
#close(discard: false) ⇒ Object
Pass discard: true to clear queued events before signaling close.
116 117 118 119 120 121 122 |
# File 'lib/igniter/store/changefeed_buffer.rb', line 116 def close(discard: false) @mu.synchronize do @items.clear if discard @closed = true @cond.broadcast end end |
#pop ⇒ Object
Blocks until next event or close signal. Returns nil when closed+drained.
108 109 110 111 112 113 |
# File 'lib/igniter/store/changefeed_buffer.rb', line 108 def pop @mu.synchronize do @cond.wait(@mu) while @items.empty? && !@closed @items.shift end end |
#push(event) ⇒ Object
Returns true if an overflow drop occurred, false otherwise.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/igniter/store/changefeed_buffer.rb', line 87 def push(event) @mu.synchronize do return false if @closed if @items.size >= @max_size case @overflow when :drop_oldest @items.shift @items << event @cond.signal when :drop_newest # discard the incoming event; queue unchanged end return true end @items << event @cond.signal false end end |
#size ⇒ Object
124 125 126 |
# File 'lib/igniter/store/changefeed_buffer.rb', line 124 def size @mu.synchronize { @items.size } end |