Class: Philiprehberger::EventStore::Store
- Inherits:
-
Object
- Object
- Philiprehberger::EventStore::Store
- Defined in:
- lib/philiprehberger/event_store/store.rb
Overview
Thread-safe in-memory event store with streams, projections, subscriptions, querying, snapshots, and replay.
Instance Method Summary collapse
-
#append(stream, event) ⇒ self
Append an event to a stream.
-
#clear(stream = nil) ⇒ Integer
Clear events (and snapshot) for the given stream, or everything when no stream is passed.
-
#initialize ⇒ Store
constructor
Create a new event store.
-
#load_from_snapshot(stream, initial: nil) {|state, event| ... } ⇒ Object
Load state from a snapshot plus any events appended after the snapshot.
-
#project(stream, initial: nil) {|state, event| ... } ⇒ Object
Project events from a stream into an accumulated state.
-
#query(stream: nil, type: nil, after: nil, before: nil, limit: nil) ⇒ Array
Query events with filters.
-
#read(stream) ⇒ Array
Read all events from a stream.
-
#read_all ⇒ Array
Read all events from all streams, ordered by global position.
-
#replay(stream, from_version: 0) ⇒ self
Replay events from a stream to all current subscribers.
-
#replay_all(from_position: 1) ⇒ self
Replay all events across all streams to their subscribers.
-
#snapshot(stream, state) ⇒ self
Save a snapshot of an aggregate’s state at the current stream version.
-
#streams ⇒ Array<String>
List all stream names.
-
#subscribe(stream) {|event| ... } ⇒ self
Subscribe to events on a stream.
-
#version(stream) ⇒ Integer
Return the current version (event count) of a stream.
Constructor Details
#initialize ⇒ Store
Create a new event store.
9 10 11 12 13 14 15 |
# File 'lib/philiprehberger/event_store/store.rb', line 9 def initialize @mutex = Mutex.new @streams = {} @subscribers = {} @global_position = 0 @snapshots = {} end |
Instance Method Details
#append(stream, event) ⇒ self
Append an event to a stream.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/philiprehberger/event_store/store.rb', line 22 def append(stream, event) stream = stream.to_s subscribers = nil @mutex.synchronize do @streams[stream] ||= [] @global_position += 1 entry = { event: event, position: @global_position, stream: stream, timestamp: Time.now } @streams[stream] << entry subscribers = (@subscribers[stream] || []).dup end subscribers.each { |callback| callback.call(event) } self end |
#clear(stream = nil) ⇒ Integer
Clear events (and snapshot) for the given stream, or everything when no stream is passed. Subscribers are retained in both cases. Clearing all also resets the global position counter.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/philiprehberger/event_store/store.rb', line 44 def clear(stream = nil) @mutex.synchronize do if stream.nil? removed = @streams.values.sum(&:size) @streams.clear @snapshots.clear @global_position = 0 removed else key = stream.to_s next 0 unless @streams.key?(key) removed = @streams[key].size @streams.delete(key) @snapshots.delete(key) removed end end end |
#load_from_snapshot(stream, initial: nil) {|state, event| ... } ⇒ Object
Load state from a snapshot plus any events appended after the snapshot.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/philiprehberger/event_store/store.rb', line 161 def load_from_snapshot(stream, initial: nil, &block) stream = stream.to_s snap = @mutex.synchronize { @snapshots[stream] } if snap state = snap[:state] remaining = @mutex.synchronize do entries = @streams[stream] || [] entries[snap[:version]..].map { |e| e[:event] } end remaining.reduce(state, &block) else project(stream, initial: initial, &block) end end |
#project(stream, initial: nil) {|state, event| ... } ⇒ Object
Project events from a stream into an accumulated state.
129 130 131 132 |
# File 'lib/philiprehberger/event_store/store.rb', line 129 def project(stream, initial: nil, &block) events = read(stream) events.reduce(initial, &block) end |
#query(stream: nil, type: nil, after: nil, before: nil, limit: nil) ⇒ Array
Query events with filters.
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/philiprehberger/event_store/store.rb', line 92 def query(stream: nil, type: nil, after: nil, before: nil, limit: nil) entries = @mutex.synchronize do if stream (@streams[stream.to_s] || []).dup else @streams.values.flatten.sort_by { |e| e[:position] } end end entries = entries.select { |e| matches_type?(e[:event], type) } if type entries = entries.select { |e| e[:timestamp] > after } if after entries = entries.select { |e| e[:timestamp] < before } if before entries = entries.first(limit) if limit entries.map { |e| e[:event] } end |
#read(stream) ⇒ Array
Read all events from a stream.
68 69 70 71 72 73 |
# File 'lib/philiprehberger/event_store/store.rb', line 68 def read(stream) stream = stream.to_s @mutex.synchronize do (@streams[stream] || []).map { |entry| entry[:event] } end end |
#read_all ⇒ Array
Read all events from all streams, ordered by global position.
78 79 80 81 82 |
# File 'lib/philiprehberger/event_store/store.rb', line 78 def read_all @mutex.synchronize do @streams.values.flatten.sort_by { |entry| entry[:position] }.map { |entry| entry[:event] } end end |
#replay(stream, from_version: 0) ⇒ self
Replay events from a stream to all current subscribers.
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/philiprehberger/event_store/store.rb', line 182 def replay(stream, from_version: 0) stream = stream.to_s events_to_replay = nil subscribers = nil @mutex.synchronize do entries = @streams[stream] || [] events_to_replay = entries[from_version..].map { |e| e[:event] } subscribers = (@subscribers[stream] || []).dup end events_to_replay.each do |event| subscribers.each { |callback| callback.call(event) } end self end |
#replay_all(from_position: 1) ⇒ self
Replay all events across all streams to their subscribers.
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/philiprehberger/event_store/store.rb', line 204 def replay_all(from_position: 1) entries_to_replay = nil @mutex.synchronize do entries_to_replay = @streams.values.flatten .select { |e| e[:position] >= from_position } .sort_by { |e| e[:position] } end entries_to_replay.each do |entry| subscribers = @mutex.synchronize { (@subscribers[entry[:stream]] || []).dup } subscribers.each { |callback| callback.call(entry[:event]) } end self end |
#snapshot(stream, state) ⇒ self
Save a snapshot of an aggregate’s state at the current stream version.
146 147 148 149 150 151 152 153 |
# File 'lib/philiprehberger/event_store/store.rb', line 146 def snapshot(stream, state) stream = stream.to_s @mutex.synchronize do version = (@streams[stream] || []).size @snapshots[stream] = { state: state, version: version } end self end |
#streams ⇒ Array<String>
List all stream names.
137 138 139 |
# File 'lib/philiprehberger/event_store/store.rb', line 137 def streams @mutex.synchronize { @streams.keys.dup } end |
#subscribe(stream) {|event| ... } ⇒ self
Subscribe to events on a stream.
114 115 116 117 118 119 120 121 |
# File 'lib/philiprehberger/event_store/store.rb', line 114 def subscribe(stream, &block) stream = stream.to_s @mutex.synchronize do @subscribers[stream] ||= [] @subscribers[stream] << block end self end |
#version(stream) ⇒ Integer
Return the current version (event count) of a stream.
225 226 227 228 |
# File 'lib/philiprehberger/event_store/store.rb', line 225 def version(stream) stream = stream.to_s @mutex.synchronize { (@streams[stream] || []).size } end |