Class: Deimos::Producer

Inherits:
Object
  • Object
show all
Includes:
SharedConfig
Defined in:
lib/deimos/producer.rb

Overview

Producer to publish messages to a given kafka topic.

Direct Known Subclasses

ActiveRecordProducer

Constant Summary collapse

MAX_BATCH_SIZE =

Returns:

  • (Integer)
500

Class Method Summary collapse

Class Method Details

.determine_backend_class(sync = false, force_send = false) ⇒ Class<Deimos::Backends::Base>

Parameters:

  • sync (Boolean) (defaults to: false)
  • force_send (Boolean) (defaults to: false)

Returns:



135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/deimos/producer.rb', line 135

def determine_backend_class(sync=false, force_send=false)
  backend = if force_send
              :kafka
            else
              Deimos.config.producers.backend
            end
  if backend == :kafka_async && sync
    backend = :kafka
  elsif backend == :kafka && sync == false
    backend = :kafka_async
  end
  "Deimos::Backends::#{backend.to_s.classify}".constantize
end

.karafka_configObject



124
125
126
# File 'lib/deimos/producer.rb', line 124

def karafka_config
  Deimos.karafka_configs.find { |topic| topic.producer_class == self }
end

.partition_key(_payload) ⇒ String

Override the default partition key (which is the payload key). Will include ‘payload_key` if it is part of the original payload.

Parameters:

  • _payload (Hash)

    the payload being passed into the produce method.

Returns:

  • (String)


73
74
75
# File 'lib/deimos/producer.rb', line 73

def partition_key(_payload)
  nil
end

.produce(messages, backend: determine_backend_class) ⇒ Object

Produce a list of messages in WaterDrop message hash format.

Parameters:

  • messages (Array<Hash>)
  • backend (Class < Deimos::Backend]) (defaults to: determine_backend_class)

    ackend [Class < Deimos::Backend]



89
90
91
92
93
94
95
96
97
98
99
# File 'lib/deimos/producer.rb', line 89

def produce(messages, backend: determine_backend_class)
  return if Deimos.producers_disabled?(self)

  messages.each do |m|
    m[:label] = m
    m[:partition_key] ||= self.partition_key(m[:payload])
  end
  messages.in_groups_of(MAX_BATCH_SIZE, false) do |batch|
    self.produce_batch(backend, batch)
  end
end

.produce_batch(backend, batch) ⇒ void

This method returns an undefined value.

Send a batch to the backend.

Parameters:



153
154
155
# File 'lib/deimos/producer.rb', line 153

def produce_batch(backend, batch)
  backend.publish(producer_class: self, messages: batch)
end

.publish(payload, topic: self.topic, headers: nil) ⇒ void

This method returns an undefined value.

Publish the payload to the topic.

Parameters:

  • payload (Hash, SchemaClass::Record)

    with an optional payload_key hash key.

  • topic (String) (defaults to: self.topic)

    if specifying the topic

  • headers (Hash) (defaults to: nil)

    if specifying headers



82
83
84
# File 'lib/deimos/producer.rb', line 82

def publish(payload, topic: self.topic, headers: nil)
  produce([{payload: payload, topic: topic, headers: headers}])
end

.publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) ⇒ void

This method returns an undefined value.

Publish a list of messages. whether to publish synchronously. and send immediately to Kafka.

Parameters:

  • payloads (Array<Hash, SchemaClass::Record>)

    with optional payload_key hash key.

  • sync (Boolean) (defaults to: nil)

    if given, override the default setting of

  • force_send (Boolean) (defaults to: false)

    if true, ignore the configured backend

  • topic (String) (defaults to: self.topic)

    if specifying the topic

  • headers (Hash) (defaults to: nil)

    if specifying headers



110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/deimos/producer.rb', line 110

def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil)
  backend = determine_backend_class(sync, force_send)

  messages = Array(payloads).map do |p|
    {
      payload: p&.to_h,
      headers: headers,
      topic: topic,
      partition_key: self.partition_key(p)
    }
  end
  self.produce(messages, backend: backend)
end

.topicObject



128
129
130
# File 'lib/deimos/producer.rb', line 128

def topic
  karafka_config.name
end