Class: Takagi::EventBus::MessageBuffer::RingBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/takagi/event_bus/message_buffer.rb

Overview

Ring buffer for a single address

Instance Method Summary collapse

Constructor Details

#initialize(max_size) ⇒ RingBuffer

Returns a new instance of RingBuffer.



22
23
24
25
26
# File 'lib/takagi/event_bus/message_buffer.rb', line 22

def initialize(max_size)
  @max_size = max_size
  @messages = []
  @mutex = Mutex.new
end

Instance Method Details

#all_messagesObject



43
44
45
# File 'lib/takagi/event_bus/message_buffer.rb', line 43

def all_messages
  @mutex.synchronize { @messages.dup }
end

#clean_expired(ttl) ⇒ Object

Clean expired messages based on TTL



60
61
62
63
64
65
# File 'lib/takagi/event_bus/message_buffer.rb', line 60

def clean_expired(ttl)
  @mutex.synchronize do
    cutoff = Time.now - ttl
    @messages.reject! { |msg| msg.timestamp < cutoff }
  end
end

#clearObject



51
52
53
# File 'lib/takagi/event_bus/message_buffer.rb', line 51

def clear
  @mutex.synchronize { @messages.clear }
end

#empty?Boolean

Returns:

  • (Boolean)


55
56
57
# File 'lib/takagi/event_bus/message_buffer.rb', line 55

def empty?
  size.zero?
end

#messages_since(timestamp = nil) ⇒ Object



35
36
37
38
39
40
41
# File 'lib/takagi/event_bus/message_buffer.rb', line 35

def messages_since(timestamp = nil)
  @mutex.synchronize do
    return @messages.dup unless timestamp

    @messages.select { |msg| msg.timestamp > timestamp }
  end
end

#push(message) ⇒ Object



28
29
30
31
32
33
# File 'lib/takagi/event_bus/message_buffer.rb', line 28

def push(message)
  @mutex.synchronize do
    @messages.push(message)
    @messages.shift if @messages.size > @max_size # FIFO eviction
  end
end

#sizeObject



47
48
49
# File 'lib/takagi/event_bus/message_buffer.rb', line 47

def size
  @mutex.synchronize { @messages.size }
end