Class: Deimos::Producer
- Inherits:
- 
      Object
      
        - Object
- Deimos::Producer
 
- Includes:
- SharedConfig
- Defined in:
- lib/deimos/producer.rb
Overview
Producer to publish messages to a given kafka topic.
Direct Known Subclasses
Constant Summary collapse
- MAX_BATCH_SIZE =
          
- 500
Class Method Summary collapse
- .config ⇒ Hash
- .determine_backend_class(sync, force_send) ⇒ Class<Deimos::Backends::Base>
- .encoder ⇒ Deimos::SchemaBackends::Base
- .key_encoder ⇒ Deimos::SchemaBackends::Base
- 
  
    
      .partition_key(_payload)  ⇒ String 
    
    
  
  
  
  
  
  
  
  
  
    Override the default partition key (which is the payload key). 
- 
  
    
      .produce_batch(backend, batch)  ⇒ void 
    
    
  
  
  
  
  
  
  
  
  
    Send a batch to the backend. 
- 
  
    
      .publish(payload, topic: self.topic, headers: nil)  ⇒ void 
    
    
  
  
  
  
  
  
  
  
  
    Publish the payload to the topic. 
- 
  
    
      .publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil)  ⇒ void 
    
    
  
  
  
  
  
  
  
  
  
    Publish a list of messages. 
- 
  
    
      .topic(topic = nil)  ⇒ String 
    
    
  
  
  
  
  
  
  
  
  
    Set the topic. 
- 
  
    
      .watched_attributes  ⇒ Array<String> 
    
    
  
  
  
  
  
  
  
  
  
    Override this in active record producers to add non-schema fields to check for updates. 
Class Method Details
.config ⇒ Hash
| 68 69 70 71 72 73 | # File 'lib/deimos/producer.rb', line 68 def config @config ||= { encode_key: true, namespace: Deimos.config.producers.schema_namespace } end | 
.determine_backend_class(sync, force_send) ⇒ Class<Deimos::Backends::Base>
| 138 139 140 141 142 143 144 145 146 147 148 149 150 | # File 'lib/deimos/producer.rb', line 138 def determine_backend_class(sync, force_send) backend = if force_send :kafka else Deimos.config.producers.backend end if backend == :kafka_async && sync backend = :kafka elsif backend == :kafka && sync == false backend = :kafka_async end "Deimos::Backends::#{backend.to_s.classify}".constantize end | 
.encoder ⇒ Deimos::SchemaBackends::Base
| 161 162 163 164 | # File 'lib/deimos/producer.rb', line 161 def encoder @encoder ||= Deimos.schema_backend(schema: config[:schema], namespace: config[:namespace]) end | 
.key_encoder ⇒ Deimos::SchemaBackends::Base
| 167 168 169 170 | # File 'lib/deimos/producer.rb', line 167 def key_encoder @key_encoder ||= Deimos.schema_backend(schema: config[:key_schema], namespace: config[:namespace]) end | 
.partition_key(_payload) ⇒ String
Override the default partition key (which is the payload key). Will include ‘payload_key` if it is part of the original payload.
| 91 92 93 | # File 'lib/deimos/producer.rb', line 91 def partition_key(_payload) nil end | 
.produce_batch(backend, batch) ⇒ void
This method returns an undefined value.
Send a batch to the backend.
| 156 157 158 | # File 'lib/deimos/producer.rb', line 156 def produce_batch(backend, batch) backend.publish(producer_class: self, messages: batch) end | 
.publish(payload, topic: self.topic, headers: nil) ⇒ void
This method returns an undefined value.
Publish the payload to the topic.
| 100 101 102 | # File 'lib/deimos/producer.rb', line 100 def publish(payload, topic: self.topic, headers: nil) publish_list([payload], topic: topic, headers: headers) end | 
.publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) ⇒ void
This method returns an undefined value.
Publish a list of messages. whether to publish synchronously. and send immediately to Kafka.
| 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | # File 'lib/deimos/producer.rb', line 113 def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) return if Deimos.config.kafka.seed_brokers.blank? || Deimos.config.producers.disabled || Deimos.producers_disabled?(self) raise 'Topic not specified. Please specify the topic.' if topic.blank? backend_class = determine_backend_class(sync, force_send) Deimos.instrument( 'encode_messages', producer: self, topic: topic, payloads: payloads ) do = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self, headers: headers) } .each { |m| (m, topic) } .in_groups_of(MAX_BATCH_SIZE, false) do |batch| self.produce_batch(backend_class, batch) end end end | 
.topic(topic = nil) ⇒ String
Set the topic.
| 78 79 80 81 82 83 84 85 | # File 'lib/deimos/producer.rb', line 78 def topic(topic=nil) if topic config[:topic] = topic return end # accessor "#{Deimos.config.producers.topic_prefix}#{config[:topic]}" end | 
.watched_attributes ⇒ Array<String>
Override this in active record producers to add non-schema fields to check for updates
| 175 176 177 | # File 'lib/deimos/producer.rb', line 175 def watched_attributes self.encoder.schema_fields.map(&:name) end |