Class: TCB::EventStore::InMemory

Inherits:
Object
  • Object
show all
Defined in:
lib/tcb/event_store/in_memory.rb

Instance Method Summary collapse

Constructor Details

#initializeInMemory

Returns a new instance of InMemory.



8
9
10
11
# File 'lib/tcb/event_store/in_memory.rb', line 8

def initialize
  @streams = Hash.new { |h, k| h[k] = [] }
  @mutex = Mutex.new
end

Instance Method Details

#append(stream_id:, events:, occurred_at: Time.now, correlation_id: nil, causation_id: nil) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/tcb/event_store/in_memory.rb', line 13

def append(stream_id:, events:, occurred_at: Time.now, correlation_id: nil, causation_id: nil)
  @mutex.synchronize do
    envelopes = events.map.with_index(next_version(stream_id)) do |event, version|
      Envelope.new(
        event:          event,
        event_id:       SecureRandom.uuid,
        stream_id:      stream_id,
        version:        version,
        occurred_at:    occurred_at,
        correlation_id: correlation_id,
        causation_id:   causation_id
      )
    end
    @streams[stream_id].concat(envelopes)
    envelopes
  end
end

#read(stream_id, from_version: nil, to_version: nil, occurred_after: nil, limit: nil, order: :asc) ⇒ Object



31
32
33
34
35
36
37
38
# File 'lib/tcb/event_store/in_memory.rb', line 31

def read(stream_id, from_version: nil, to_version: nil, occurred_after: nil, limit: nil, order: :asc)
  @mutex.synchronize { @streams[stream_id].dup }
    .then { |e| from_version   ? e.select { |env| env.version >= from_version }      : e }
    .then { |e| to_version     ? e.select { |env| env.version <= to_version }        : e }
    .then { |e| occurred_after ? e.select { |env| env.occurred_at > occurred_after } : e }
    .then { |e| order == :desc ? e.reverse                                           : e }
    .then { |e| limit          ? e.first(limit)                                      : e }
end

#read_by_correlation(correlation_id, context:, occurred_after: nil, occurred_before: nil) ⇒ Object



40
41
42
43
44
45
46
# File 'lib/tcb/event_store/in_memory.rb', line 40

def read_by_correlation(correlation_id, context:, occurred_after: nil, occurred_before: nil)
  @mutex.synchronize { @streams.values.flatten.dup }
    .select { |e| e.stream_id.start_with?(context) }
    .select { |e| e.correlation_id == correlation_id }
    .then { |e| occurred_after  ? e.select { |env| env.occurred_at > occurred_after }  : e }
    .then { |e| occurred_before ? e.select { |env| env.occurred_at < occurred_before } : e }
end

#reset!Object



48
49
50
51
52
# File 'lib/tcb/event_store/in_memory.rb', line 48

def reset!
  @mutex.synchronize do
    @streams = Hash.new { |h, k| h[k] = [] }
  end
end