Module: Deimos::ProducerMiddleware
- Defined in:
- lib/deimos/ext/producer_middleware.rb
Class Method Summary collapse
- ._encode_key(key, config) ⇒ String|Object
- ._process_message(message, karafka_message, config) ⇒ Object
- ._retrieve_key(payload, key_transcoder) ⇒ String
- .call(message) ⇒ Object
- .validate_key_config(config, message) ⇒ Object
Class Method Details
._encode_key(key, config) ⇒ String|Object
71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/deimos/ext/producer_middleware.rb', line 71 def _encode_key(key, config) return nil if key.nil? if config.deserializers[:key].respond_to?(:encode_key) config.deserializers[:key].encode_key(key) elsif key config.deserializers[:payload].encode(key) else key end end |
._process_message(message, karafka_message, config) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/deimos/ext/producer_middleware.rb', line 46 def (, , config) encoder = config.deserializers[:payload].backend key_transcoder = config.deserializers[:key] # this violates the Law of Demeter but it has to happen in a very # specific order and requires a bunch of methods on the producer # to work correctly. .add_fields(encoder.schema_fields.map(&:name)) .key = [:key] || _retrieve_key(.payload, key_transcoder) # need to do this before _coerce_fields because that might result # in an empty payload which is an *error* whereas this is intended. .payload = nil if .payload.blank? .coerce_fields(encoder) .encoded_key = _encode_key(.key, config) .topic = config.name .encoded_payload = if .payload.nil? nil else encoder.encode(.payload, topic: "#{Deimos.config.producers.topic_prefix}#{config.name}-value") end end |
._retrieve_key(payload, key_transcoder) ⇒ String
86 87 88 89 90 91 |
# File 'lib/deimos/ext/producer_middleware.rb', line 86 def _retrieve_key(payload, key_transcoder) key = payload.delete(:payload_key) return key if key || !key_transcoder.respond_to?(:key_field) key_transcoder.key_field ? payload[key_transcoder.key_field] : nil end |
.call(message) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/deimos/ext/producer_middleware.rb', line 6 def call() Karafka.monitor.instrument( 'deimos.encode_message', producer: self, message: ) do config = Deimos.karafka_config_for(topic: [:topic]) return if config.nil? return if [:payload] && ![:payload].is_a?(Hash) && ![:payload].is_a?(SchemaClass::Record) m = Deimos::Message.new([:payload].to_h, headers: [:headers], partition_key: [:partition_key]) (m, , config) [:payload] = m.encoded_payload [:key] = m.encoded_key [:partition_key] = if m.partition_key m.partition_key.to_s elsif m.key m.key.to_s else nil end [:topic] = "#{Deimos.config.producers.topic_prefix}#{config.name}" validate_key_config(config, ) end end |
.validate_key_config(config, message) ⇒ Object
37 38 39 40 41 |
# File 'lib/deimos/ext/producer_middleware.rb', line 37 def validate_key_config(config, ) if [:key].nil? && config.deserializers[:key].is_a?(Deimos::Transcoder) raise 'No key given but a key is required! Use `key_config none: true` to avoid using keys.' end end |