Class: Deimos::Backends::Outbox

Inherits:
Base
  • Object
show all
Defined in:
lib/deimos/backends/outbox.rb

Overview

Backend which saves messages to the database instead of immediately sending them.

Class Method Summary collapse

Methods inherited from Base

publish

Class Method Details

.execute(producer_class:, messages:) ⇒ Object

:nodoc:



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/deimos/backends/outbox.rb', line 12

def execute(producer_class:, messages:)
  records = messages.map do |m|
    Deimos::ProducerMiddleware.call(m)
    message = Deimos::KafkaMessage.new(
      message: m[:payload] ? m[:payload].to_s.b : nil,
      topic: m[:topic],
      partition_key: partition_key_for(m)
    )
    message.key = m[:key].to_s.b if m[:key]
    message
  end
  Deimos::KafkaMessage.import(records)
  Deimos.config.metrics&.increment(
    'outbox.insert',
    tags: %W(topic:#{producer_class.topic}),
    by: records.size
  )
end

.partition_key_for(message) ⇒ String

Returns the partition key to use for this message.

Parameters:

Returns:

  • (String)

    the partition key to use for this message



33
34
35
36
37
38
39
40
41
# File 'lib/deimos/backends/outbox.rb', line 33

def partition_key_for(message)
  if message[:partition_key].present?
    message[:partition_key]
  elsif message[:key].present?
    message[:key].to_s.b
  else
    nil
  end
end