Class: Igniter::Store::ChangefeedBuffer::SubscriberQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/igniter/store/changefeed_buffer.rb

Overview

Bounded FIFO queue for one subscriber’s async delivery pipeline.

push is non-blocking and returns true when an overflow drop occurs. pop blocks until an event is available or the queue is closed. Once closed, pop drains remaining items (unless discard was requested) then returns nil.

Instance Method Summary collapse

Constructor Details

#initialize(max_size:, overflow: :drop_oldest) ⇒ SubscriberQueue

Returns a new instance of SubscriberQueue.



77
78
79
80
81
82
83
84
# File 'lib/igniter/store/changefeed_buffer.rb', line 77

def initialize(max_size:, overflow: :drop_oldest)
  @max_size = max_size
  @overflow = overflow
  @items    = []
  @mu       = Mutex.new
  @cond     = ConditionVariable.new
  @closed   = false
end

Instance Method Details

#close(discard: false) ⇒ Object

Pass discard: true to clear queued events before signaling close.



116
117
118
119
120
121
122
# File 'lib/igniter/store/changefeed_buffer.rb', line 116

def close(discard: false)
  @mu.synchronize do
    @items.clear if discard
    @closed = true
    @cond.broadcast
  end
end

#popObject

Blocks until next event or close signal. Returns nil when closed+drained.



108
109
110
111
112
113
# File 'lib/igniter/store/changefeed_buffer.rb', line 108

def pop
  @mu.synchronize do
    @cond.wait(@mu) while @items.empty? && !@closed
    @items.shift
  end
end

#push(event) ⇒ Object

Returns true if an overflow drop occurred, false otherwise.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/igniter/store/changefeed_buffer.rb', line 87

def push(event)
  @mu.synchronize do
    return false if @closed
    if @items.size >= @max_size
      case @overflow
      when :drop_oldest
        @items.shift
        @items << event
        @cond.signal
      when :drop_newest
        # discard the incoming event; queue unchanged
      end
      return true
    end
    @items << event
    @cond.signal
    false
  end
end

#sizeObject



124
125
126
# File 'lib/igniter/store/changefeed_buffer.rb', line 124

def size
  @mu.synchronize { @items.size }
end