philiprehberger-event_store
In-memory event store with streams, projections, subscriptions, snapshots, and replay
Requirements
- Ruby >= 3.1
Installation
Add to your Gemfile:
gem "philiprehberger-event_store"
Or install directly:
gem install philiprehberger-event_store
Usage
require "philiprehberger/event_store"
store = Philiprehberger::EventStore.new
store.append(:orders, { type: 'OrderPlaced', id: 1, total: 99.99 })
store.append(:orders, { type: 'OrderShipped', id: 1 })
store.read(:orders)
# => [{ type: 'OrderPlaced', ... }, { type: 'OrderShipped', ... }]
Subscriptions
store.subscribe(:orders) do |event|
puts "New order event: #{event[:type]}"
end
store.append(:orders, { type: 'OrderCancelled', id: 2 })
# prints: New order event: OrderCancelled
Projections
total = store.project(:orders, initial: 0) do |sum, event|
event[:type] == 'OrderPlaced' ? sum + event[:total] : sum
end
# => 99.99
Event Querying
# Filter by stream
store.query(stream: :orders)
# Filter by event type
store.query(type: 'OrderPlaced')
# Combine filters with time range and limit
store.query(stream: :orders, after: 1.hour.ago, limit: 10)
Snapshots
Save aggregate state at a point in time, then rebuild from the snapshot plus newer events:
# Build state from events
state = store.project(:orders, initial: { count: 0, total: 0 }) do |s, e|
{ count: s[:count] + 1, total: s[:total] + (e[:total] || 0) }
end
# Save a snapshot
store.snapshot(:orders, state)
# Later, after more events have been appended:
store.append(:orders, { type: 'OrderPlaced', total: 50 })
# Rebuild from snapshot + new events (avoids replaying entire history)
rebuilt = store.load_from_snapshot(:orders) do |s, e|
{ count: s[:count] + 1, total: s[:total] + (e[:total] || 0) }
end
Replay
Re-emit past events to current subscribers:
# Replay all events in a stream
store.replay(:orders)
# Replay from a specific version (0-based index)
store.replay(:orders, from_version: 5)
# Replay all events across all streams
store.replay_all
# Replay from a global position
store.replay_all(from_position: 100)
Clearing streams
Remove events (and snapshots) while keeping subscribers registered:
received = []
store.subscribe(:orders) { |e| received << e }
store.append(:orders, { type: 'OrderPlaced' })
# Clear a single stream — subscribers stay attached
store.clear(:orders)
store.append(:orders, { type: 'OrderPlaced' })
# subscriber still fires for the new event
# Clear everything — streams and snapshots wiped, subscribers retained,
# global position reset to zero
store.clear
Reading All Events
store.read_all # => all events across all streams, ordered by position
store.streams # => ['orders', ...]
store.version(:orders) # => 5 (event count in stream)
API
| Method | Description |
|---|---|
.new |
Create a new event store |
#append(stream, event) |
Append an event to a stream |
#read(stream) |
Read all events from a stream |
#read_all |
Read all events across all streams |
#query(stream:, type:, after:, before:, limit:) |
Query events with filters |
| `#subscribe(stream) { \ | e\ |
| `#project(stream, initial:) { \ | state, e\ |
#snapshot(stream, state) |
Save aggregate state at current stream version |
| `#load_from_snapshot(stream, initial:) { \ | state, e\ |
#replay(stream, from_version:) |
Replay stream events to subscribers |
#replay_all(from_position:) |
Replay all events across streams to subscribers |
#version(stream) |
Return event count for a stream |
#streams |
List all stream names |
#clear(stream = nil) |
Remove events and snapshot for a stream, or everything when no stream is passed (subscribers retained) |
Development
bundle install
bundle exec rspec
bundle exec rubocop
Support
If you find this project useful: