Class: Deimos::Producer
- Inherits:
- 
      Object
      
        - Object
- Deimos::Producer
 
- Includes:
- SharedConfig
- Defined in:
- lib/deimos/producer.rb
Overview
Producer to publish messages to a given kafka topic.
Direct Known Subclasses
Constant Summary collapse
- MAX_BATCH_SIZE =
          
- 500
Class Method Summary collapse
- .config ⇒ Hash
- .determine_backend_class(sync, force_send) ⇒ Class<Deimos::Backends::Base>
- .encoder ⇒ Deimos::SchemaBackends::Base
- .key_encoder ⇒ Deimos::SchemaBackends::Base
- 
  
    
      .partition_key(_payload)  ⇒ String 
    
    
  
  
  
  
  
  
  
  
  
    Override the default partition key (which is the payload key). 
- 
  
    
      .produce_batch(backend, batch)  ⇒ void 
    
    
  
  
  
  
  
  
  
  
  
    Send a batch to the backend. 
- 
  
    
      .publish(payload, topic: self.topic)  ⇒ void 
    
    
  
  
  
  
  
  
  
  
  
    Publish the payload to the topic. 
- 
  
    
      .publish_list(payloads, sync: nil, force_send: false, topic: self.topic)  ⇒ void 
    
    
  
  
  
  
  
  
  
  
  
    Publish a list of messages. 
- 
  
    
      .topic(topic = nil)  ⇒ String 
    
    
  
  
  
  
  
  
  
  
  
    Set the topic. 
- 
  
    
      .watched_attributes  ⇒ Array<String> 
    
    
  
  
  
  
  
  
  
  
  
    Override this in active record producers to add non-schema fields to check for updates. 
Class Method Details
.config ⇒ Hash
| 65 66 67 68 69 70 | # File 'lib/deimos/producer.rb', line 65 def config @config ||= { encode_key: true, namespace: Deimos.config.producers.schema_namespace } end | 
.determine_backend_class(sync, force_send) ⇒ Class<Deimos::Backends::Base>
| 133 134 135 136 137 138 139 140 141 142 143 144 145 | # File 'lib/deimos/producer.rb', line 133 def determine_backend_class(sync, force_send) backend = if force_send :kafka else Deimos.config.producers.backend end if backend == :kafka_async && sync backend = :kafka elsif backend == :kafka && sync == false backend = :kafka_async end "Deimos::Backends::#{backend.to_s.classify}".constantize end | 
.encoder ⇒ Deimos::SchemaBackends::Base
| 156 157 158 159 | # File 'lib/deimos/producer.rb', line 156 def encoder @encoder ||= Deimos.schema_backend(schema: config[:schema], namespace: config[:namespace]) end | 
.key_encoder ⇒ Deimos::SchemaBackends::Base
| 162 163 164 165 | # File 'lib/deimos/producer.rb', line 162 def key_encoder @key_encoder ||= Deimos.schema_backend(schema: config[:key_schema], namespace: config[:namespace]) end | 
.partition_key(_payload) ⇒ String
Override the default partition key (which is the payload key). Will include `payload_key` if it is part of the original payload.
| 88 89 90 | # File 'lib/deimos/producer.rb', line 88 def partition_key(_payload) nil end | 
.produce_batch(backend, batch) ⇒ void
This method returns an undefined value.
Send a batch to the backend.
| 151 152 153 | # File 'lib/deimos/producer.rb', line 151 def produce_batch(backend, batch) backend.publish(producer_class: self, messages: batch) end | 
.publish(payload, topic: self.topic) ⇒ void
This method returns an undefined value.
Publish the payload to the topic.
| 96 97 98 | # File 'lib/deimos/producer.rb', line 96 def publish(payload, topic: self.topic) publish_list([payload], topic: topic) end | 
.publish_list(payloads, sync: nil, force_send: false, topic: self.topic) ⇒ void
This method returns an undefined value.
Publish a list of messages. whether to publish synchronously. and send immediately to Kafka.
| 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 | # File 'lib/deimos/producer.rb', line 108 def publish_list(payloads, sync: nil, force_send: false, topic: self.topic) return if Deimos.config.kafka.seed_brokers.blank? || Deimos.config.producers.disabled || Deimos.producers_disabled?(self) raise 'Topic not specified. Please specify the topic.' if topic.blank? backend_class = determine_backend_class(sync, force_send) Deimos.instrument( 'encode_messages', producer: self, topic: topic, payloads: payloads ) do = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self) } .each { |m| (m, topic) } .in_groups_of(MAX_BATCH_SIZE, false) do |batch| self.produce_batch(backend_class, batch) end end end | 
.topic(topic = nil) ⇒ String
Set the topic.
| 75 76 77 78 79 80 81 82 | # File 'lib/deimos/producer.rb', line 75 def topic(topic=nil) if topic config[:topic] = topic return end # accessor "#{Deimos.config.producers.topic_prefix}#{config[:topic]}" end | 
.watched_attributes ⇒ Array<String>
Override this in active record producers to add non-schema fields to check for updates
| 170 171 172 | # File 'lib/deimos/producer.rb', line 170 def watched_attributes self.encoder.schema_fields.map(&:name) end |