Class: Takagi::EventBus::MessageBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/takagi/event_bus/message_buffer.rb

Overview

Bounded message buffer for distributed addresses Stores recent messages in a ring buffer per address for replay on reconnection

Features:

  • Bounded memory (max messages per address)

  • TTL-based expiration

  • Thread-safe

  • Zero external dependencies

  • Only buffers distributed addresses

Examples:

buffer = MessageBuffer.new(max_messages: 100, ttl: 300)
buffer.store(address, message)
messages = buffer.replay(address, since: Time.now - 60)

Defined Under Namespace

Classes: RingBuffer

Instance Method Summary collapse

Constructor Details

#initialize(max_messages: 100, ttl: 300) ⇒ MessageBuffer

Returns a new instance of MessageBuffer.

Parameters:

  • max_messages (Integer) (defaults to: 100)

    Maximum messages per address (default: 100)

  • ttl (Integer) (defaults to: 300)

    Time-to-live in seconds (default: 300 = 5 minutes)



70
71
72
73
74
75
76
77
78
# File 'lib/takagi/event_bus/message_buffer.rb', line 70

def initialize(max_messages: 100, ttl: 300)
  @max_messages = max_messages
  @ttl = ttl
  @buffers = Hash.new { |h, k| h[k] = RingBuffer.new(@max_messages) }
  @mutex = Mutex.new
  @enabled = true

  start_cleanup_thread
end

Instance Method Details

#all(address) ⇒ Array<Message>

Get all buffered messages for an address

Parameters:

  • address (String)

    Event address

Returns:

  • (Array<Message>)

    All buffered messages



138
139
140
# File 'lib/takagi/event_bus/message_buffer.rb', line 138

def all(address)
  replay(address, since: nil)
end

#clear(address) ⇒ Object

Clear buffer for an address

Parameters:

  • address (String)

    Event address



164
165
166
167
168
# File 'lib/takagi/event_bus/message_buffer.rb', line 164

def clear(address)
  @mutex.synchronize do
    @buffers[address]&.clear
  end
end

#clear_allObject

Clear all buffers



171
172
173
174
175
# File 'lib/takagi/event_bus/message_buffer.rb', line 171

def clear_all
  @mutex.synchronize do
    @buffers.clear
  end
end

#disableObject

Disable message buffering



183
184
185
# File 'lib/takagi/event_bus/message_buffer.rb', line 183

def disable
  @enabled = false
end

#enableObject

Enable message buffering



178
179
180
# File 'lib/takagi/event_bus/message_buffer.rb', line 178

def enable
  @enabled = true
end

#enabled?Boolean

Check if buffering is enabled

Returns:

  • (Boolean)


188
189
190
# File 'lib/takagi/event_bus/message_buffer.rb', line 188

def enabled?
  @enabled
end

#replay(address, since: nil) ⇒ Array<Message>

Replay messages for an address since a given timestamp

Examples:

Replay last 60 seconds

messages = buffer.replay('sensor.temperature.room1', since: Time.now - 60)

Parameters:

  • address (String)

    Event address

  • since (Time, nil) (defaults to: nil)

    Return messages since this time (nil = all messages)

Returns:

  • (Array<Message>)

    Buffered messages



126
127
128
129
130
131
132
# File 'lib/takagi/event_bus/message_buffer.rb', line 126

def replay(address, since: nil)
  @mutex.synchronize do
    return [] unless @buffers.key?(address)

    @buffers[address].messages_since(since)
  end
end

#shutdownObject

Shutdown cleanup thread



209
210
211
212
# File 'lib/takagi/event_bus/message_buffer.rb', line 209

def shutdown
  @cleanup_thread&.kill
  @cleanup_thread = nil
end

#size(address) ⇒ Integer

Get buffer size for an address

Parameters:

  • address (String)

    Event address

Returns:

  • (Integer)

    Number of buffered messages



146
147
148
149
150
# File 'lib/takagi/event_bus/message_buffer.rb', line 146

def size(address)
  @mutex.synchronize do
    @buffers[address]&.size || 0
  end
end

#statsHash

Get statistics about buffering

Returns:

  • (Hash)

    Buffer statistics



195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/takagi/event_bus/message_buffer.rb', line 195

def stats
  @mutex.synchronize do
    {
      enabled: @enabled,
      addresses: @buffers.keys.size,
      total_messages: @buffers.values.sum(&:size),
      max_messages_per_address: @max_messages,
      ttl: @ttl,
      buffers: @buffers.transform_values(&:size)
    }
  end
end

#store(address, message) ⇒ Object

Store a message in the buffer Only stores distributed addresses to conserve memory

Parameters:

  • address (String)

    Event address

  • message (Message)

    Event message



85
86
87
88
89
90
91
92
# File 'lib/takagi/event_bus/message_buffer.rb', line 85

def store(address, message)
  return unless @enabled
  return unless AddressPrefix.distributed?(address)

  @mutex.synchronize do
    @buffers[address].push(message)
  end
end

#store_failed(address, message, destination = nil) ⇒ Object

Store a failed delivery for retry Used by CoAPBridge when network delivery fails

Parameters:

  • address (String)

    Event address

  • message (Message)

    Event message

  • destination (String) (defaults to: nil)

    Failed destination (for logging)



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/takagi/event_bus/message_buffer.rb', line 100

def store_failed(address, message, destination = nil)
  return unless @enabled

  # Add failure metadata to headers (immutable, so create new message)
  headers = message.headers.dup
  headers[:failed_at] = Time.now
  headers[:failed_destination] = destination if destination

  failed_message = Message.new(
    message.address,
    message.body,
    headers: headers,
    reply_address: message.reply_address
  )

  store(address, failed_message)
end

#total_sizeInteger

Get total number of buffered messages across all addresses

Returns:

  • (Integer)

    Total buffered messages



155
156
157
158
159
# File 'lib/takagi/event_bus/message_buffer.rb', line 155

def total_size
  @mutex.synchronize do
    @buffers.values.sum(&:size)
  end
end