Module: Deimos::ProducerMiddleware

Defined in:
lib/deimos/ext/producer_middleware.rb

Class Method Summary collapse

Class Method Details

._encode_key(key, config) ⇒ String|Object

Parameters:

  • key (Object)
  • config (ProducerConfig)

Returns:

  • (String|Object)


71
72
73
74
75
76
77
78
79
80
81
# File 'lib/deimos/ext/producer_middleware.rb', line 71

def _encode_key(key, config)
  return nil if key.nil?

  if config.deserializers[:key].respond_to?(:encode_key)
    config.deserializers[:key].encode_key(key)
  elsif key
    config.deserializers[:payload].encode(key)
  else
    key
  end
end

._process_message(message, karafka_message, config) ⇒ Object

Parameters:

  • message (Deimos::Message)
  • karafka_message (Hash)
  • config (Deimos::ProducerConfig)


46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/deimos/ext/producer_middleware.rb', line 46

def _process_message(message, karafka_message, config)
  encoder = config.deserializers[:payload].backend
  key_transcoder = config.deserializers[:key]
  # this violates the Law of Demeter but it has to happen in a very
  # specific order and requires a bunch of methods on the producer
  # to work correctly.
  message.add_fields(encoder.schema_fields.map(&:name))
  message.key = karafka_message[:key] || _retrieve_key(message.payload, key_transcoder)
  # need to do this before _coerce_fields because that might result
  # in an empty payload which is an *error* whereas this is intended.
  message.payload = nil if message.payload.blank?
  message.coerce_fields(encoder)
  message.encoded_key = _encode_key(message.key, config)
  message.topic = config.name
  message.encoded_payload = if message.payload.nil?
                              nil
                            else
                              encoder.encode(message.payload,
                                             topic: "#{Deimos.config.producers.topic_prefix}#{config.name}-value")
                            end
end

._retrieve_key(payload, key_transcoder) ⇒ String

Parameters:

Returns:

  • (String)


86
87
88
89
90
91
# File 'lib/deimos/ext/producer_middleware.rb', line 86

def _retrieve_key(payload, key_transcoder)
  key = payload.delete(:payload_key)
  return key if key || !key_transcoder.respond_to?(:key_field)

  key_transcoder.key_field ? payload[key_transcoder.key_field] : nil
end

.call(message) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/deimos/ext/producer_middleware.rb', line 6

def call(message)
  Karafka.monitor.instrument(
    'deimos.encode_message',
    producer: self,
    message: message
  ) do
    config = Deimos.karafka_config_for(topic: message[:topic])
    return message if config.nil?
    return if message[:payload] && !message[:payload].is_a?(Hash) && !message[:payload].is_a?(SchemaClass::Record)

    m = Deimos::Message.new(message[:payload].to_h,
                            headers: message[:headers],
                            partition_key: message[:partition_key])
    _process_message(m, message, config)
    message[:payload] = m.encoded_payload
    message[:key] = m.encoded_key
    message[:partition_key] = if m.partition_key
                                m.partition_key.to_s
                              elsif m.key
                                m.key.to_s
                              else
                                nil
                              end
    message[:topic] = "#{Deimos.config.producers.topic_prefix}#{config.name}"

    validate_key_config(config, message)

    message
  end
end

.validate_key_config(config, message) ⇒ Object



37
38
39
40
41
# File 'lib/deimos/ext/producer_middleware.rb', line 37

def validate_key_config(config, message)
  if message[:key].nil? && config.deserializers[:key].is_a?(Deimos::Transcoder)
     raise 'No key given but a key is required! Use `key_config none: true` to avoid using keys.'
  end
end