Class: Deimos::Producer
- Inherits:
-
Object
- Object
- Deimos::Producer
- Includes:
- SharedConfig
- Defined in:
- lib/deimos/producer.rb,
sig/defs.rbs
Overview
Producer to publish messages to a given kafka topic.
Direct Known Subclasses
Constant Summary collapse
- MAX_BATCH_SIZE =
500
Class Method Summary collapse
- .config ⇒ ::Hash[untyped, untyped]
-
.determine_backend_class(sync = false, force_send = false) ⇒ singleton(Deimos::Backends::Base)
@param
sync. - .encoder ⇒ Deimos::SchemaBackends::Base
- .karafka_config ⇒ Object
- .key_encoder ⇒ Deimos::SchemaBackends::Base
-
.partition_key(_payload) ⇒ String
Override the default partition key (which is the payload key).
-
.produce(messages, backend: determine_backend_class) ⇒ Object
Produce a list of messages in WaterDrop message hash format.
-
.produce_batch(backend, batch) ⇒ void
Send a batch to the backend.
-
.publish(payload, topic: self.topic, headers: nil) ⇒ void
Publish the payload to the topic.
-
.publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) ⇒ Object
Publish a list of messages.
-
.topic ⇒ String
Set the topic.
-
.watched_attributes ⇒ ::Array[String]
Override this in active record producers to add non-schema fields to check for updates.
Class Method Details
.config ⇒ ::Hash[untyped, untyped]
170 |
# File 'sig/defs.rbs', line 170
def self.config: () -> ::Hash[untyped, untyped]
|
.determine_backend_class(sync = false, force_send = false) ⇒ singleton(Deimos::Backends::Base)
@param sync
@param force_send
148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/deimos/producer.rb', line 148 def determine_backend_class(sync=false, force_send=false) backend = if force_send :kafka else Deimos.config.producers.backend end if backend == :kafka_async && sync backend = :kafka elsif backend == :kafka && sync == false backend = :kafka_async end "Deimos::Backends::#{backend.to_s.classify}".constantize end |
.encoder ⇒ Deimos::SchemaBackends::Base
222 |
# File 'sig/defs.rbs', line 222
def self.encoder: () -> Deimos::SchemaBackends::Base
|
.karafka_config ⇒ Object
134 135 136 137 138 |
# File 'lib/deimos/producer.rb', line 134 def karafka_config Deimos.karafka_configs.find do |t| t.producer_classes&.any? { |k| k&.name == self.name } end end |
.key_encoder ⇒ Deimos::SchemaBackends::Base
224 |
# File 'sig/defs.rbs', line 224
def self.key_encoder: () -> Deimos::SchemaBackends::Base
|
.partition_key(_payload) ⇒ String
Override the default partition key (which is the payload key).
Will include payload_key if it is part of the original payload.
@param _payload — the payload being passed into the produce method.
73 74 75 |
# File 'lib/deimos/producer.rb', line 73 def partition_key(_payload) nil end |
.produce(messages, backend: determine_backend_class) ⇒ Object
Produce a list of messages in WaterDrop message hash format.
89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/deimos/producer.rb', line 89 def produce(, backend: determine_backend_class) return if Deimos.producers_disabled?(self) .each do |m| m[:label] = m m[:partition_key] ||= self.partition_key(m[:payload]) end .in_groups_of(MAX_BATCH_SIZE, false) do |batch| self.produce_batch(backend, batch) end end |
.produce_batch(backend, batch) ⇒ void
This method returns an undefined value.
Send a batch to the backend.
@param backend
@param batch
167 168 169 |
# File 'lib/deimos/producer.rb', line 167 def produce_batch(backend, batch) backend.publish(producer_class: self, messages: batch) end |
.publish(payload, topic: self.topic, headers: nil) ⇒ void
This method returns an undefined value.
Publish the payload to the topic.
@param payload — with an optional payload_key hash key.
@param topic — if specifying the topic
82 83 84 |
# File 'lib/deimos/producer.rb', line 82 def publish(payload, topic: self.topic, headers: nil) produce([{ payload: payload, topic: topic, headers: headers }]) end |
.publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) ⇒ Object
Publish a list of messages. whether to publish synchronously. and send immediately to Kafka.
@param payloads — with optional payload_key hash key.
@param sync — if given, override the default setting of
@param force_send — if true, ignore the configured backend
@param topic — if specifying the topic
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/deimos/producer.rb', line 110 def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) backend = determine_backend_class(sync, force_send) = Array(payloads).map do |p| payload = p payload = payload.to_h if p.is_a?(AvroGen::SchemaClass::Record) m = { payload: payload, headers: headers, topic: topic, partition_key: self.partition_key(p) } if payload.is_a?(Hash) && payload.key?(:key) && payload.key?(:message) m[:key] = payload[:key] m[:key] = m[:key].to_h if m[:key].nil? || m[:key].is_a?(AvroGen::SchemaClass::Record) m[:payload] = payload[:message] m[:payload] = m[:payload].to_h if m[:payload].nil? || m[:payload].is_a?(AvroGen::SchemaClass::Record) end m end self.produce(, backend: backend) end |
.topic ⇒ String
Set the topic.
@param topic
@return — the current topic if no argument given.
177 178 179 |
# File 'sig/defs.rbs', line 177 def topic karafka_config&.name end |
.watched_attributes ⇒ ::Array[String]
Override this in active record producers to add non-schema fields to check for updates
@return — fields to check for updates
230 |
# File 'sig/defs.rbs', line 230
def self.watched_attributes: () -> ::Array[String]
|