Class: Deimos::Backends::Outbox
- Defined in:
- lib/deimos/backends/outbox.rb
Overview
Backend which saves messages to the database instead of immediately sending them.
Class Method Summary collapse
-
.execute(producer_class:, messages:) ⇒ Object
:nodoc:.
-
.partition_key_for(message) ⇒ String
The partition key to use for this message.
Methods inherited from Base
Class Method Details
.execute(producer_class:, messages:) ⇒ Object
:nodoc:
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/deimos/backends/outbox.rb', line 12 def execute(producer_class:, messages:) records = .map do |m| Deimos::ProducerMiddleware.call(m) = Deimos::KafkaMessage.new( message: m[:payload] ? m[:payload].to_s.b : nil, topic: m[:topic], partition_key: partition_key_for(m) ) .key = m[:key].to_s.b if m[:key] end Deimos::KafkaMessage.import(records) Deimos.config.metrics&.increment( 'outbox.insert', tags: %W(topic:#{producer_class.topic}), by: records.size ) end |
.partition_key_for(message) ⇒ String
Returns the partition key to use for this message.
33 34 35 36 37 38 39 40 41 |
# File 'lib/deimos/backends/outbox.rb', line 33 def partition_key_for() if [:partition_key].present? [:partition_key] elsif [:key].present? [:key].to_s.b else nil end end |