Class: TCB::OutboxStore::InMemory
- Inherits:
-
Object
- Object
- TCB::OutboxStore::InMemory
- Defined in:
- lib/tcb/outbox_store/in_memory.rb
Instance Method Summary collapse
- #all ⇒ Object
-
#initialize(_model = nil) ⇒ InMemory
constructor
A new instance of InMemory.
- #insert(event_id:, stream_id:, version:, handler_class:) ⇒ Object
- #lock(entry, locked_at: Time.now) ⇒ Object
- #mark_delivered(entry, delivered_at: Time.now) ⇒ Object
- #mark_failed(entry, error:) ⇒ Object
- #pending ⇒ Object
- #recover_stale_locks(older_than:) ⇒ Object
Constructor Details
#initialize(_model = nil) ⇒ InMemory
Returns a new instance of InMemory.
6 7 8 9 |
# File 'lib/tcb/outbox_store/in_memory.rb', line 6 def initialize(_model = nil) @entries = {} @mutex = Mutex.new end |
Instance Method Details
#all ⇒ Object
28 29 30 |
# File 'lib/tcb/outbox_store/in_memory.rb', line 28 def all @mutex.synchronize { @entries.values.dup } end |
#insert(event_id:, stream_id:, version:, handler_class:) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/tcb/outbox_store/in_memory.rb', line 11 def insert(event_id:, stream_id:, version:, handler_class:) entry = OutboxEntry.new( id: SecureRandom.uuid, event_id: event_id, stream_id: stream_id, version: version, handler_class: handler_class.name, status: :pending, locked_at: nil, delivered_at: nil, error: nil, created_at: Time.now ) @mutex.synchronize { @entries[entry.id] = entry } entry end |
#lock(entry, locked_at: Time.now) ⇒ Object
38 39 40 41 42 |
# File 'lib/tcb/outbox_store/in_memory.rb', line 38 def lock(entry, locked_at: Time.now) updated = entry.with(status: :locked, locked_at: locked_at) @mutex.synchronize { @entries[entry.id] = updated } updated end |
#mark_delivered(entry, delivered_at: Time.now) ⇒ Object
44 45 46 47 48 |
# File 'lib/tcb/outbox_store/in_memory.rb', line 44 def mark_delivered(entry, delivered_at: Time.now) updated = entry.with(status: :delivered, delivered_at: delivered_at) @mutex.synchronize { @entries[entry.id] = updated } updated end |
#mark_failed(entry, error:) ⇒ Object
50 51 52 53 54 |
# File 'lib/tcb/outbox_store/in_memory.rb', line 50 def mark_failed(entry, error:) updated = entry.with(status: :failed, error: error.) @mutex.synchronize { @entries[entry.id] = updated } updated end |
#pending ⇒ Object
32 33 34 35 36 |
# File 'lib/tcb/outbox_store/in_memory.rb', line 32 def pending @mutex .synchronize { @entries.values.select { |e| e.status == :pending } .sort_by { |e| [e.stream_id, e.version] } } end |
#recover_stale_locks(older_than:) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/tcb/outbox_store/in_memory.rb', line 56 def recover_stale_locks(older_than:) stale = @mutex.synchronize do @entries.values.select { |e| e.status == :locked && e.locked_at < older_than } end stale.map do |entry| updated = entry.with(status: :pending, locked_at: nil) @mutex.synchronize { @entries[entry.id] = updated } updated end end |