Class: Deimos::Producer

Inherits:
Object
  • Object
show all
Includes:
SharedConfig
Defined in:
lib/deimos/producer.rb

Overview

Producer to publish messages to a given kafka topic.

Direct Known Subclasses

ActiveRecordProducer

Constant Summary collapse

MAX_BATCH_SIZE =
500

Class Method Summary collapse

Class Method Details

.configHash

Returns:

  • (Hash)


62
63
64
65
66
67
# File 'lib/deimos/producer.rb', line 62

def config
  @config ||= {
    encode_key: true,
    namespace: Deimos.config.producers.schema_namespace
  }
end

.determine_backend_class(sync, force_send) ⇒ Class < Deimos::Backend]

Returns Class < Deimos::Backend].

Parameters:

  • sync (Boolean)
  • force_send (Boolean)

Returns:

  • (Class < Deimos::Backend])

    Class < Deimos::Backend]



128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/deimos/producer.rb', line 128

def determine_backend_class(sync, force_send)
  backend = if force_send
              :kafka
            else
              Deimos.config.producers.backend
            end
  if backend == :kafka_async && sync
    backend = :kafka
  elsif backend == :kafka && sync == false
    backend = :kafka_async
  end
  "Deimos::Backends::#{backend.to_s.classify}".constantize
end

.encoderDeimos::SchemaBackends::Base



150
151
152
153
# File 'lib/deimos/producer.rb', line 150

def encoder
  @encoder ||= Deimos.schema_backend(schema: config[:schema],
                                     namespace: config[:namespace])
end

.key_encoderDeimos::SchemaBackends::Base



156
157
158
159
# File 'lib/deimos/producer.rb', line 156

def key_encoder
  @key_encoder ||= Deimos.schema_backend(schema: config[:key_schema],
                                         namespace: config[:namespace])
end

.partition_key(_payload) ⇒ String

Override the default partition key (which is the payload key). Will include `payload_key` if it is part of the original payload.

Parameters:

  • _payload (Hash)

    the payload being passed into the produce method.

Returns:

  • (String)


85
86
87
# File 'lib/deimos/producer.rb', line 85

def partition_key(_payload)
  nil
end

.produce_batch(backend, batch) ⇒ Object

Send a batch to the backend.

Parameters:

  • backend (Class < Deimos::Backend])

    ackend [Class < Deimos::Backend]

  • batch (Array<Deimos::Message>)


145
146
147
# File 'lib/deimos/producer.rb', line 145

def produce_batch(backend, batch)
  backend.publish(producer_class: self, messages: batch)
end

.publish(payload, topic: self.topic) ⇒ Object

Publish the payload to the topic.

Parameters:

  • payload (Hash|SchemaClass::Record)

    with an optional payload_key hash key.

  • topic (String) (defaults to: self.topic)

    if specifying the topic



92
93
94
# File 'lib/deimos/producer.rb', line 92

def publish(payload, topic: self.topic)
  publish_list([payload], topic: topic)
end

.publish_list(payloads, sync: nil, force_send: false, topic: self.topic) ⇒ Object

Publish a list of messages. whether to publish synchronously. and send immediately to Kafka.

Parameters:

  • payloads (Array<Hash|SchemaClass::Record>)

    with optional payload_key hash key.

  • sync (Boolean) (defaults to: nil)

    if given, override the default setting of

  • force_send (Boolean) (defaults to: false)

    if true, ignore the configured backend

  • topic (String) (defaults to: self.topic)

    if specifying the topic



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/deimos/producer.rb', line 103

def publish_list(payloads, sync: nil, force_send: false, topic: self.topic)
  return if Deimos.config.kafka.seed_brokers.blank? ||
            Deimos.config.producers.disabled ||
            Deimos.producers_disabled?(self)

  raise 'Topic not specified. Please specify the topic.' if topic.blank?

  backend_class = determine_backend_class(sync, force_send)
  Deimos.instrument(
    'encode_messages',
    producer: self,
    topic: topic,
    payloads: payloads
  ) do
    messages = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self) }
    messages.each { |m| _process_message(m, topic) }
    messages.in_groups_of(MAX_BATCH_SIZE, false) do |batch|
      self.produce_batch(backend_class, batch)
    end
  end
end

.topic(topic = nil) ⇒ String

Set the topic.

Parameters:

  • topic (String) (defaults to: nil)

Returns:

  • (String)

    the current topic if no argument given.



72
73
74
75
76
77
78
79
# File 'lib/deimos/producer.rb', line 72

def topic(topic=nil)
  if topic
    config[:topic] = topic
    return
  end
  # accessor
  "#{Deimos.config.producers.topic_prefix}#{config[:topic]}"
end

.watched_attributesArray<String>

Override this in active record producers to add non-schema fields to check for updates

Returns:

  • (Array<String>)

    fields to check for updates



164
165
166
# File 'lib/deimos/producer.rb', line 164

def watched_attributes
  self.encoder.schema_fields.map(&:name)
end