Class: Pgbus::Testing::EventStore

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/testing.rb

Overview

Thread-safe in-memory store for events captured in fake/inline mode.

Instance Method Summary collapse

Constructor Details

#initializeEventStore

Returns a new instance of EventStore.



23
24
25
26
# File 'lib/pgbus/testing.rb', line 23

def initialize
  @mutex = Mutex.new
  @events = []
end

Instance Method Details

#clear!Object



44
45
46
# File 'lib/pgbus/testing.rb', line 44

def clear!
  @mutex.synchronize { @events.clear }
end

#drain!Object

Dispatch all stored events to their matching handlers, then clear. Events are removed one at a time after successful dispatch so that a handler exception leaves unprocessed events in the store.



51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/pgbus/testing.rb', line 51

def drain!
  loop do
    event = @mutex.synchronize { @events.first }
    break unless event

    Pgbus::EventBus::Registry.instance.handlers_for(event.routing_key).each do |subscriber|
      subscriber.handler_class.new.handle(event)
    end

    @mutex.synchronize { @events.shift }
  end
end

#events(routing_key: nil) ⇒ Object



32
33
34
35
36
37
38
# File 'lib/pgbus/testing.rb', line 32

def events(routing_key: nil)
  @mutex.synchronize do
    result = @events.dup
    result = result.select { |e| e.routing_key == routing_key } if routing_key
    result
  end
end

#push_event(event) ⇒ Object



28
29
30
# File 'lib/pgbus/testing.rb', line 28

def push_event(event)
  @mutex.synchronize { @events << event }
end

#sizeObject



40
41
42
# File 'lib/pgbus/testing.rb', line 40

def size
  @mutex.synchronize { @events.size }
end