Class: Deimos::Backends::Db
Overview
Backend which saves messages to the database instead of immediately sending them.
Class Method Summary collapse
- 
  
    
      .execute(producer_class:, messages:)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    :nodoc:. 
- 
  
    
      .partition_key_for(message)  ⇒ String 
    
    
  
  
  
  
  
  
  
  
  
    The partition key to use for this message. 
Methods inherited from Base
Class Method Details
.execute(producer_class:, messages:) ⇒ Object
:nodoc:
| 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | # File 'lib/deimos/backends/db.rb', line 12 def execute(producer_class:, messages:) records = .map do |m| = Deimos::KafkaMessage.new( message: m.encoded_payload ? m.encoded_payload.to_s.b : nil, topic: m.topic, partition_key: partition_key_for(m) ) .key = m.encoded_key.to_s.b unless producer_class.config[:no_keys] end Deimos::KafkaMessage.import(records) Deimos.config.metrics&.increment( 'db_producer.insert', tags: %W(topic:#{producer_class.topic}), by: records.size ) end | 
.partition_key_for(message) ⇒ String
Returns the partition key to use for this message.
| 32 33 34 35 36 37 | # File 'lib/deimos/backends/db.rb', line 32 def partition_key_for() return .partition_key if .partition_key.present? return .key unless .key.is_a?(Hash) .key.to_yaml end |