Class: Philiprehberger::EventStore::Store
- Inherits:
-
Object
- Object
- Philiprehberger::EventStore::Store
- Defined in:
- lib/philiprehberger/event_store/store.rb
Overview
Thread-safe in-memory event store with streams, projections, subscriptions, querying, snapshots, and replay.
Instance Method Summary collapse
-
#append(stream, event) ⇒ self
Append an event to a stream.
-
#clear(stream = nil) ⇒ Integer
Clear events (and snapshot) for the given stream, or everything when no stream is passed.
-
#initialize ⇒ Store
constructor
Create a new event store.
-
#last(stream) ⇒ Object?
Return the most recently appended event in a stream without materializing the rest of the stream.
-
#load_from_snapshot(stream, initial: nil) {|state, event| ... } ⇒ Object
Load state from a snapshot plus any events appended after the snapshot.
-
#project(stream, initial: nil) {|state, event| ... } ⇒ Object
Project events from a stream into an accumulated state.
-
#query(stream: nil, type: nil, after: nil, before: nil, limit: nil) ⇒ Array
Query events with filters.
-
#read(stream) ⇒ Array
Read all events from a stream.
-
#read_all ⇒ Array
Read all events from all streams, ordered by global position.
-
#replay(stream, from_version: 0) ⇒ self
Replay events from a stream to all current subscribers.
-
#replay_all(from_position: 1) ⇒ self
Replay all events across all streams to their subscribers.
-
#snapshot(stream, state) ⇒ self
Save a snapshot of an aggregate’s state at the current stream version.
-
#streams ⇒ Array<String>
List all stream names.
-
#subscribe(stream) {|event| ... } ⇒ self
Subscribe to events on a stream.
-
#total_events ⇒ Integer
Total number of events across all streams.
-
#version(stream) ⇒ Integer
Return the current version (event count) of a stream.
Constructor Details
#initialize ⇒ Store
Create a new event store.
9 10 11 12 13 14 15 |
# File 'lib/philiprehberger/event_store/store.rb', line 9 def initialize @mutex = Mutex.new @streams = {} @subscribers = {} @global_position = 0 @snapshots = {} end |
Instance Method Details
#append(stream, event) ⇒ self
Append an event to a stream.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/philiprehberger/event_store/store.rb', line 22 def append(stream, event) stream = stream.to_s subscribers = nil @mutex.synchronize do @streams[stream] ||= [] @global_position += 1 entry = { event: event, position: @global_position, stream: stream, timestamp: Time.now } @streams[stream] << entry subscribers = (@subscribers[stream] || []).dup end subscribers.each { |callback| callback.call(event) } self end |
#clear(stream = nil) ⇒ Integer
Clear events (and snapshot) for the given stream, or everything when no stream is passed. Subscribers are retained in both cases. Clearing all also resets the global position counter.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/philiprehberger/event_store/store.rb', line 44 def clear(stream = nil) @mutex.synchronize do if stream.nil? removed = @streams.values.sum(&:size) @streams.clear @snapshots.clear @global_position = 0 removed else key = stream.to_s next 0 unless @streams.key?(key) removed = @streams[key].size @streams.delete(key) @snapshots.delete(key) removed end end end |
#last(stream) ⇒ Object?
Return the most recently appended event in a stream without materializing the rest of the stream.
80 81 82 83 84 85 86 |
# File 'lib/philiprehberger/event_store/store.rb', line 80 def last(stream) stream = stream.to_s @mutex.synchronize do entries = @streams[stream] entries && !entries.empty? ? entries.last[:event] : nil end end |
#load_from_snapshot(stream, initial: nil) {|state, event| ... } ⇒ Object
Load state from a snapshot plus any events appended after the snapshot.
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/philiprehberger/event_store/store.rb', line 174 def load_from_snapshot(stream, initial: nil, &block) stream = stream.to_s snap = @mutex.synchronize { @snapshots[stream] } if snap state = snap[:state] remaining = @mutex.synchronize do entries = @streams[stream] || [] entries[snap[:version]..].map { |e| e[:event] } end remaining.reduce(state, &block) else project(stream, initial: initial, &block) end end |
#project(stream, initial: nil) {|state, event| ... } ⇒ Object
Project events from a stream into an accumulated state.
142 143 144 145 |
# File 'lib/philiprehberger/event_store/store.rb', line 142 def project(stream, initial: nil, &block) events = read(stream) events.reduce(initial, &block) end |
#query(stream: nil, type: nil, after: nil, before: nil, limit: nil) ⇒ Array
Query events with filters.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/philiprehberger/event_store/store.rb', line 105 def query(stream: nil, type: nil, after: nil, before: nil, limit: nil) entries = @mutex.synchronize do if stream (@streams[stream.to_s] || []).dup else @streams.values.flatten.sort_by { |e| e[:position] } end end entries = entries.select { |e| matches_type?(e[:event], type) } if type entries = entries.select { |e| e[:timestamp] > after } if after entries = entries.select { |e| e[:timestamp] < before } if before entries = entries.first(limit) if limit entries.map { |e| e[:event] } end |
#read(stream) ⇒ Array
Read all events from a stream.
68 69 70 71 72 73 |
# File 'lib/philiprehberger/event_store/store.rb', line 68 def read(stream) stream = stream.to_s @mutex.synchronize do (@streams[stream] || []).map { |entry| entry[:event] } end end |
#read_all ⇒ Array
Read all events from all streams, ordered by global position.
91 92 93 94 95 |
# File 'lib/philiprehberger/event_store/store.rb', line 91 def read_all @mutex.synchronize do @streams.values.flatten.sort_by { |entry| entry[:position] }.map { |entry| entry[:event] } end end |
#replay(stream, from_version: 0) ⇒ self
Replay events from a stream to all current subscribers.
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/philiprehberger/event_store/store.rb', line 195 def replay(stream, from_version: 0) stream = stream.to_s events_to_replay = nil subscribers = nil @mutex.synchronize do entries = @streams[stream] || [] events_to_replay = entries[from_version..].map { |e| e[:event] } subscribers = (@subscribers[stream] || []).dup end events_to_replay.each do |event| subscribers.each { |callback| callback.call(event) } end self end |
#replay_all(from_position: 1) ⇒ self
Replay all events across all streams to their subscribers.
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/philiprehberger/event_store/store.rb', line 217 def replay_all(from_position: 1) entries_to_replay = nil @mutex.synchronize do entries_to_replay = @streams.values.flatten .select { |e| e[:position] >= from_position } .sort_by { |e| e[:position] } end entries_to_replay.each do |entry| subscribers = @mutex.synchronize { (@subscribers[entry[:stream]] || []).dup } subscribers.each { |callback| callback.call(entry[:event]) } end self end |
#snapshot(stream, state) ⇒ self
Save a snapshot of an aggregate’s state at the current stream version.
159 160 161 162 163 164 165 166 |
# File 'lib/philiprehberger/event_store/store.rb', line 159 def snapshot(stream, state) stream = stream.to_s @mutex.synchronize do version = (@streams[stream] || []).size @snapshots[stream] = { state: state, version: version } end self end |
#streams ⇒ Array<String>
List all stream names.
150 151 152 |
# File 'lib/philiprehberger/event_store/store.rb', line 150 def streams @mutex.synchronize { @streams.keys.dup } end |
#subscribe(stream) {|event| ... } ⇒ self
Subscribe to events on a stream.
127 128 129 130 131 132 133 134 |
# File 'lib/philiprehberger/event_store/store.rb', line 127 def subscribe(stream, &block) stream = stream.to_s @mutex.synchronize do @subscribers[stream] ||= [] @subscribers[stream] << block end self end |
#total_events ⇒ Integer
Total number of events across all streams.
Companion to ‘#version(stream)` at the whole-store level; useful for dashboard-style total counters.
249 250 251 |
# File 'lib/philiprehberger/event_store/store.rb', line 249 def total_events @mutex.synchronize { @streams.values.sum(&:size) } end |
#version(stream) ⇒ Integer
Return the current version (event count) of a stream.
238 239 240 241 |
# File 'lib/philiprehberger/event_store/store.rb', line 238 def version(stream) stream = stream.to_s @mutex.synchronize { (@streams[stream] || []).size } end |