Class: TCB::EventStore::InMemory

Inherits:
Object
  • Object
show all
Defined in:
lib/tcb/event_store/in_memory.rb

Instance Method Summary collapse

Constructor Details

#initializeInMemory

Returns a new instance of InMemory.



8
9
10
11
# File 'lib/tcb/event_store/in_memory.rb', line 8

def initialize
  @streams = Hash.new { |h, k| h[k] = [] }
  @mutex = Mutex.new
end

Instance Method Details

#append(stream_id:, events:, occurred_at: Time.now) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/tcb/event_store/in_memory.rb', line 13

def append(stream_id:, events:, occurred_at: Time.now)
  @mutex.synchronize do
    envelopes = events.map.with_index(next_version(stream_id)) do |event, version|
      EventStreamEnvelope.new(
        event: event,
        event_id: SecureRandom.uuid,
        stream_id: stream_id,
        version: version,
        occurred_at: occurred_at
      )
    end
    @streams[stream_id].concat(envelopes)
    envelopes
  end
end

#read(stream_id, from_version: nil, to_version: nil, occurred_after: nil, limit: nil, order: :asc) ⇒ Object



29
30
31
32
33
34
35
36
# File 'lib/tcb/event_store/in_memory.rb', line 29

def read(stream_id, from_version: nil, to_version: nil, occurred_after: nil, limit: nil, order: :asc)
  @mutex.synchronize { @streams[stream_id].dup }
    .then { |e| from_version   ? e.select { |env| env.version >= from_version }      : e }
    .then { |e| to_version     ? e.select { |env| env.version <= to_version }        : e }
    .then { |e| occurred_after ? e.select { |env| env.occurred_at > occurred_after } : e }
    .then { |e| order == :desc ? e.reverse                                           : e }
    .then { |e| limit          ? e.first(limit)                                      : e }
end

#reset!Object



38
39
40
41
42
# File 'lib/tcb/event_store/in_memory.rb', line 38

def reset!
  @mutex.synchronize do
    @streams = Hash.new { |h, k| h[k] = [] }
  end
end