Class: Deimos::Producer
- Inherits:
-
Object
- Object
- Deimos::Producer
- Includes:
- SharedConfig
- Defined in:
- lib/deimos/producer.rb
Overview
Producer to publish messages to a given kafka topic.
Direct Known Subclasses
Constant Summary collapse
- MAX_BATCH_SIZE =
500
Class Method Summary collapse
- .determine_backend_class(sync = false, force_send = false) ⇒ Class<Deimos::Backends::Base>
- .karafka_config ⇒ Object
-
.partition_key(_payload) ⇒ String
Override the default partition key (which is the payload key).
-
.produce(messages, backend: determine_backend_class) ⇒ Object
Produce a list of messages in WaterDrop message hash format.
-
.produce_batch(backend, batch) ⇒ void
Send a batch to the backend.
-
.publish(payload, topic: self.topic, headers: nil) ⇒ void
Publish the payload to the topic.
-
.publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) ⇒ void
Publish a list of messages.
- .topic ⇒ Object
Class Method Details
.determine_backend_class(sync = false, force_send = false) ⇒ Class<Deimos::Backends::Base>
135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/deimos/producer.rb', line 135 def determine_backend_class(sync=false, force_send=false) backend = if force_send :kafka else Deimos.config.producers.backend end if backend == :kafka_async && sync backend = :kafka elsif backend == :kafka && sync == false backend = :kafka_async end "Deimos::Backends::#{backend.to_s.classify}".constantize end |
.karafka_config ⇒ Object
124 125 126 |
# File 'lib/deimos/producer.rb', line 124 def karafka_config Deimos.karafka_configs.find { |topic| topic.producer_class == self } end |
.partition_key(_payload) ⇒ String
Override the default partition key (which is the payload key). Will include ‘payload_key` if it is part of the original payload.
73 74 75 |
# File 'lib/deimos/producer.rb', line 73 def partition_key(_payload) nil end |
.produce(messages, backend: determine_backend_class) ⇒ Object
Produce a list of messages in WaterDrop message hash format.
89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/deimos/producer.rb', line 89 def produce(, backend: determine_backend_class) return if Deimos.producers_disabled?(self) .each do |m| m[:label] = m m[:partition_key] ||= self.partition_key(m[:payload]) end .in_groups_of(MAX_BATCH_SIZE, false) do |batch| self.produce_batch(backend, batch) end end |
.produce_batch(backend, batch) ⇒ void
This method returns an undefined value.
Send a batch to the backend.
153 154 155 |
# File 'lib/deimos/producer.rb', line 153 def produce_batch(backend, batch) backend.publish(producer_class: self, messages: batch) end |
.publish(payload, topic: self.topic, headers: nil) ⇒ void
This method returns an undefined value.
Publish the payload to the topic.
82 83 84 |
# File 'lib/deimos/producer.rb', line 82 def publish(payload, topic: self.topic, headers: nil) produce([{payload: payload, topic: topic, headers: headers}]) end |
.publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) ⇒ void
This method returns an undefined value.
Publish a list of messages. whether to publish synchronously. and send immediately to Kafka.
110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/deimos/producer.rb', line 110 def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) backend = determine_backend_class(sync, force_send) = Array(payloads).map do |p| { payload: p&.to_h, headers: headers, topic: topic, partition_key: self.partition_key(p) } end self.produce(, backend: backend) end |
.topic ⇒ Object
128 129 130 |
# File 'lib/deimos/producer.rb', line 128 def topic karafka_config.name end |