Class: Philiprehberger::EventStore::Store

Inherits:
Object
  • Object
show all
Defined in:
lib/philiprehberger/event_store/store.rb

Overview

Thread-safe in-memory event store with streams, projections, subscriptions, querying, snapshots, and replay.

Instance Method Summary collapse

Constructor Details

#initializeStore

Create a new event store.



9
10
11
12
13
14
15
# File 'lib/philiprehberger/event_store/store.rb', line 9

def initialize
  @mutex = Mutex.new
  @streams = {}
  @subscribers = {}
  @global_position = 0
  @snapshots = {}
end

Instance Method Details

#append(stream, event) ⇒ self

Append an event to a stream.

Parameters:

  • stream (String, Symbol)

    the stream name

  • event (Object)

    the event to append

Returns:

  • (self)


22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/philiprehberger/event_store/store.rb', line 22

def append(stream, event)
  stream = stream.to_s
  subscribers = nil

  @mutex.synchronize do
    @streams[stream] ||= []
    @global_position += 1
    entry = { event: event, position: @global_position, stream: stream, timestamp: Time.now }
    @streams[stream] << entry
    subscribers = (@subscribers[stream] || []).dup
  end

  subscribers.each { |callback| callback.call(event) }
  self
end

#clear(stream = nil) ⇒ Integer

Clear events (and snapshot) for the given stream, or everything when no stream is passed. Subscribers are retained in both cases. Clearing all also resets the global position counter.

Parameters:

  • stream (String, Symbol, nil) (defaults to: nil)

    the stream name, or nil to clear all

Returns:

  • (Integer)

    the number of events removed



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/philiprehberger/event_store/store.rb', line 44

def clear(stream = nil)
  @mutex.synchronize do
    if stream.nil?
      removed = @streams.values.sum(&:size)
      @streams.clear
      @snapshots.clear
      @global_position = 0
      removed
    else
      key = stream.to_s
      next 0 unless @streams.key?(key)

      removed = @streams[key].size
      @streams.delete(key)
      @snapshots.delete(key)
      removed
    end
  end
end

#load_from_snapshot(stream, initial: nil) {|state, event| ... } ⇒ Object

Load state from a snapshot plus any events appended after the snapshot.

Parameters:

  • stream (String, Symbol)

    the stream name

  • initial (Object) (defaults to: nil)

    the initial state if no snapshot exists

Yields:

  • (state, event)

    called for each event after the snapshot

Returns:

  • (Object)

    the rebuilt state



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/philiprehberger/event_store/store.rb', line 161

def load_from_snapshot(stream, initial: nil, &block)
  stream = stream.to_s
  snap = @mutex.synchronize { @snapshots[stream] }

  if snap
    state = snap[:state]
    remaining = @mutex.synchronize do
      entries = @streams[stream] || []
      entries[snap[:version]..].map { |e| e[:event] }
    end
    remaining.reduce(state, &block)
  else
    project(stream, initial: initial, &block)
  end
end

#project(stream, initial: nil) {|state, event| ... } ⇒ Object

Project events from a stream into an accumulated state.

Parameters:

  • stream (String, Symbol)

    the stream name

  • initial (Object) (defaults to: nil)

    the initial state

Yields:

  • (state, event)

    called for each event to produce the next state

Returns:

  • (Object)

    the final projected state



129
130
131
132
# File 'lib/philiprehberger/event_store/store.rb', line 129

def project(stream, initial: nil, &block)
  events = read(stream)
  events.reduce(initial, &block)
end

#query(stream: nil, type: nil, after: nil, before: nil, limit: nil) ⇒ Array

Query events with filters.

Parameters:

  • stream (String, Symbol, nil) (defaults to: nil)

    filter by stream name

  • type (Class, String, nil) (defaults to: nil)

    filter by event type (class or string class name)

  • after (Time, nil) (defaults to: nil)

    only events after this time

  • before (Time, nil) (defaults to: nil)

    only events before this time

  • limit (Integer, nil) (defaults to: nil)

    maximum number of events to return

Returns:

  • (Array)

    matching events



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/philiprehberger/event_store/store.rb', line 92

def query(stream: nil, type: nil, after: nil, before: nil, limit: nil)
  entries = @mutex.synchronize do
    if stream
      (@streams[stream.to_s] || []).dup
    else
      @streams.values.flatten.sort_by { |e| e[:position] }
    end
  end

  entries = entries.select { |e| matches_type?(e[:event], type) } if type
  entries = entries.select { |e| e[:timestamp] > after } if after
  entries = entries.select { |e| e[:timestamp] < before } if before
  entries = entries.first(limit) if limit

  entries.map { |e| e[:event] }
end

#read(stream) ⇒ Array

Read all events from a stream.

Parameters:

  • stream (String, Symbol)

    the stream name

Returns:

  • (Array)

    events in the stream



68
69
70
71
72
73
# File 'lib/philiprehberger/event_store/store.rb', line 68

def read(stream)
  stream = stream.to_s
  @mutex.synchronize do
    (@streams[stream] || []).map { |entry| entry[:event] }
  end
end

#read_allArray

Read all events from all streams, ordered by global position.

Returns:

  • (Array)

    all events across all streams



78
79
80
81
82
# File 'lib/philiprehberger/event_store/store.rb', line 78

def read_all
  @mutex.synchronize do
    @streams.values.flatten.sort_by { |entry| entry[:position] }.map { |entry| entry[:event] }
  end
end

#replay(stream, from_version: 0) ⇒ self

Replay events from a stream to all current subscribers.

Parameters:

  • stream (String, Symbol)

    the stream name

  • from_version (Integer) (defaults to: 0)

    start replaying from this version (0-based index)

Returns:

  • (self)


182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/philiprehberger/event_store/store.rb', line 182

def replay(stream, from_version: 0)
  stream = stream.to_s
  events_to_replay = nil
  subscribers = nil

  @mutex.synchronize do
    entries = @streams[stream] || []
    events_to_replay = entries[from_version..].map { |e| e[:event] }
    subscribers = (@subscribers[stream] || []).dup
  end

  events_to_replay.each do |event|
    subscribers.each { |callback| callback.call(event) }
  end

  self
end

#replay_all(from_position: 1) ⇒ self

Replay all events across all streams to their subscribers.

Parameters:

  • from_position (Integer) (defaults to: 1)

    start from this global position (1-based)

Returns:

  • (self)


204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/philiprehberger/event_store/store.rb', line 204

def replay_all(from_position: 1)
  entries_to_replay = nil

  @mutex.synchronize do
    entries_to_replay = @streams.values.flatten
                                .select { |e| e[:position] >= from_position }
                                .sort_by { |e| e[:position] }
  end

  entries_to_replay.each do |entry|
    subscribers = @mutex.synchronize { (@subscribers[entry[:stream]] || []).dup }
    subscribers.each { |callback| callback.call(entry[:event]) }
  end

  self
end

#snapshot(stream, state) ⇒ self

Save a snapshot of an aggregate’s state at the current stream version.

Parameters:

  • stream (String, Symbol)

    the stream name

  • state (Object)

    the aggregate state to snapshot

Returns:

  • (self)


146
147
148
149
150
151
152
153
# File 'lib/philiprehberger/event_store/store.rb', line 146

def snapshot(stream, state)
  stream = stream.to_s
  @mutex.synchronize do
    version = (@streams[stream] || []).size
    @snapshots[stream] = { state: state, version: version }
  end
  self
end

#streamsArray<String>

List all stream names.

Returns:

  • (Array<String>)

    stream names



137
138
139
# File 'lib/philiprehberger/event_store/store.rb', line 137

def streams
  @mutex.synchronize { @streams.keys.dup }
end

#subscribe(stream) {|event| ... } ⇒ self

Subscribe to events on a stream.

Parameters:

  • stream (String, Symbol)

    the stream name

Yields:

  • (event)

    called for each new event appended to the stream

Returns:

  • (self)


114
115
116
117
118
119
120
121
# File 'lib/philiprehberger/event_store/store.rb', line 114

def subscribe(stream, &block)
  stream = stream.to_s
  @mutex.synchronize do
    @subscribers[stream] ||= []
    @subscribers[stream] << block
  end
  self
end

#version(stream) ⇒ Integer

Return the current version (event count) of a stream.

Parameters:

  • stream (String, Symbol)

    the stream name

Returns:

  • (Integer)


225
226
227
228
# File 'lib/philiprehberger/event_store/store.rb', line 225

def version(stream)
  stream = stream.to_s
  @mutex.synchronize { (@streams[stream] || []).size }
end