Class: Takagi::EventBus::MessageBuffer
- Inherits:
-
Object
- Object
- Takagi::EventBus::MessageBuffer
- Defined in:
- lib/takagi/event_bus/message_buffer.rb
Overview
Bounded message buffer for distributed addresses Stores recent messages in a ring buffer per address for replay on reconnection
Features:
-
Bounded memory (max messages per address)
-
TTL-based expiration
-
Thread-safe
-
Zero external dependencies
-
Only buffers distributed addresses
Defined Under Namespace
Classes: RingBuffer
Instance Method Summary collapse
-
#all(address) ⇒ Array<Message>
Get all buffered messages for an address.
-
#clear(address) ⇒ Object
Clear buffer for an address.
-
#clear_all ⇒ Object
Clear all buffers.
-
#disable ⇒ Object
Disable message buffering.
-
#enable ⇒ Object
Enable message buffering.
-
#enabled? ⇒ Boolean
Check if buffering is enabled.
-
#initialize(max_messages: 100, ttl: 300) ⇒ MessageBuffer
constructor
A new instance of MessageBuffer.
-
#replay(address, since: nil) ⇒ Array<Message>
Replay messages for an address since a given timestamp.
-
#shutdown ⇒ Object
Shutdown cleanup thread.
-
#size(address) ⇒ Integer
Get buffer size for an address.
-
#stats ⇒ Hash
Get statistics about buffering.
-
#store(address, message) ⇒ Object
Store a message in the buffer Only stores distributed addresses to conserve memory.
-
#store_failed(address, message, destination = nil) ⇒ Object
Store a failed delivery for retry Used by CoAPBridge when network delivery fails.
-
#total_size ⇒ Integer
Get total number of buffered messages across all addresses.
Constructor Details
#initialize(max_messages: 100, ttl: 300) ⇒ MessageBuffer
Returns a new instance of MessageBuffer.
70 71 72 73 74 75 76 77 78 |
# File 'lib/takagi/event_bus/message_buffer.rb', line 70 def initialize(max_messages: 100, ttl: 300) @max_messages = @ttl = ttl @buffers = Hash.new { |h, k| h[k] = RingBuffer.new(@max_messages) } @mutex = Mutex.new @enabled = true start_cleanup_thread end |
Instance Method Details
#all(address) ⇒ Array<Message>
Get all buffered messages for an address
138 139 140 |
# File 'lib/takagi/event_bus/message_buffer.rb', line 138 def all(address) replay(address, since: nil) end |
#clear(address) ⇒ Object
Clear buffer for an address
164 165 166 167 168 |
# File 'lib/takagi/event_bus/message_buffer.rb', line 164 def clear(address) @mutex.synchronize do @buffers[address]&.clear end end |
#clear_all ⇒ Object
Clear all buffers
171 172 173 174 175 |
# File 'lib/takagi/event_bus/message_buffer.rb', line 171 def clear_all @mutex.synchronize do @buffers.clear end end |
#disable ⇒ Object
Disable message buffering
183 184 185 |
# File 'lib/takagi/event_bus/message_buffer.rb', line 183 def disable @enabled = false end |
#enable ⇒ Object
Enable message buffering
178 179 180 |
# File 'lib/takagi/event_bus/message_buffer.rb', line 178 def enable @enabled = true end |
#enabled? ⇒ Boolean
Check if buffering is enabled
188 189 190 |
# File 'lib/takagi/event_bus/message_buffer.rb', line 188 def enabled? @enabled end |
#replay(address, since: nil) ⇒ Array<Message>
Replay messages for an address since a given timestamp
126 127 128 129 130 131 132 |
# File 'lib/takagi/event_bus/message_buffer.rb', line 126 def replay(address, since: nil) @mutex.synchronize do return [] unless @buffers.key?(address) @buffers[address].(since) end end |
#shutdown ⇒ Object
Shutdown cleanup thread
209 210 211 212 |
# File 'lib/takagi/event_bus/message_buffer.rb', line 209 def shutdown @cleanup_thread&.kill @cleanup_thread = nil end |
#size(address) ⇒ Integer
Get buffer size for an address
146 147 148 149 150 |
# File 'lib/takagi/event_bus/message_buffer.rb', line 146 def size(address) @mutex.synchronize do @buffers[address]&.size || 0 end end |
#stats ⇒ Hash
Get statistics about buffering
195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/takagi/event_bus/message_buffer.rb', line 195 def stats @mutex.synchronize do { enabled: @enabled, addresses: @buffers.keys.size, total_messages: @buffers.values.sum(&:size), max_messages_per_address: @max_messages, ttl: @ttl, buffers: @buffers.transform_values(&:size) } end end |
#store(address, message) ⇒ Object
Store a message in the buffer Only stores distributed addresses to conserve memory
85 86 87 88 89 90 91 92 |
# File 'lib/takagi/event_bus/message_buffer.rb', line 85 def store(address, ) return unless @enabled return unless AddressPrefix.distributed?(address) @mutex.synchronize do @buffers[address].push() end end |
#store_failed(address, message, destination = nil) ⇒ Object
Store a failed delivery for retry Used by CoAPBridge when network delivery fails
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/takagi/event_bus/message_buffer.rb', line 100 def store_failed(address, , destination = nil) return unless @enabled # Add failure metadata to headers (immutable, so create new message) headers = .headers.dup headers[:failed_at] = Time.now headers[:failed_destination] = destination if destination = Message.new( .address, .body, headers: headers, reply_address: .reply_address ) store(address, ) end |
#total_size ⇒ Integer
Get total number of buffered messages across all addresses
155 156 157 158 159 |
# File 'lib/takagi/event_bus/message_buffer.rb', line 155 def total_size @mutex.synchronize do @buffers.values.sum(&:size) end end |