Class: TCB::OutboxStore::InMemory

Inherits:
Object
  • Object
show all
Defined in:
lib/tcb/outbox_store/in_memory.rb

Instance Method Summary collapse

Constructor Details

#initialize(_model = nil) ⇒ InMemory

Returns a new instance of InMemory.



6
7
8
9
# File 'lib/tcb/outbox_store/in_memory.rb', line 6

def initialize(_model = nil)
  @entries = {}
  @mutex = Mutex.new
end

Instance Method Details

#allObject



28
29
30
# File 'lib/tcb/outbox_store/in_memory.rb', line 28

def all
  @mutex.synchronize { @entries.values.dup }
end

#insert(event_id:, stream_id:, version:, handler_class:) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/tcb/outbox_store/in_memory.rb', line 11

def insert(event_id:, stream_id:, version:, handler_class:)
  entry = OutboxEntry.new(
    id:            SecureRandom.uuid,
    event_id:      event_id,
    stream_id:     stream_id,
    version:       version,
    handler_class: handler_class.name,
    status:        :pending,
    locked_at:     nil,
    delivered_at:  nil,
    error:         nil,
    created_at:    Time.now
  )
  @mutex.synchronize { @entries[entry.id] = entry }
  entry
end

#lock(entry, locked_at: Time.now) ⇒ Object



38
39
40
41
42
# File 'lib/tcb/outbox_store/in_memory.rb', line 38

def lock(entry, locked_at: Time.now)
  updated = entry.with(status: :locked, locked_at: locked_at)
  @mutex.synchronize { @entries[entry.id] = updated }
  updated
end

#mark_delivered(entry, delivered_at: Time.now) ⇒ Object



44
45
46
47
48
# File 'lib/tcb/outbox_store/in_memory.rb', line 44

def mark_delivered(entry, delivered_at: Time.now)
  updated = entry.with(status: :delivered, delivered_at: delivered_at)
  @mutex.synchronize { @entries[entry.id] = updated }
  updated
end

#mark_failed(entry, error:) ⇒ Object



50
51
52
53
54
# File 'lib/tcb/outbox_store/in_memory.rb', line 50

def mark_failed(entry, error:)
  updated = entry.with(status: :failed, error: error.message)
  @mutex.synchronize { @entries[entry.id] = updated }
  updated
end

#pendingObject



32
33
34
35
36
# File 'lib/tcb/outbox_store/in_memory.rb', line 32

def pending
  @mutex
    .synchronize { @entries.values.select { |e| e.status == :pending }
    .sort_by { |e| [e.stream_id, e.version] } }
end

#recover_stale_locks(older_than:) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
# File 'lib/tcb/outbox_store/in_memory.rb', line 56

def recover_stale_locks(older_than:)
  stale = @mutex.synchronize do
    @entries.values.select { |e| e.status == :locked && e.locked_at < older_than }
  end

  stale.map do |entry|
    updated = entry.with(status: :pending, locked_at: nil)
    @mutex.synchronize { @entries[entry.id] = updated }
    updated
  end
end