Module: Karafka::Pro::Processing::Piping::Consumer
- Defined in:
- lib/karafka/pro/processing/piping/consumer.rb
Overview
Consumer piping functionality
It provides way to pipe data in a consistent way with extra traceability headers similar to those in the enhanced DLQ.
Instance Method Summary collapse
-
#pipe_async(topic:, message:) ⇒ Object
Pipes given message to the provided topic with expected details.
-
#pipe_many_async(topic:, messages:) ⇒ Object
Async multi-message pipe.
-
#pipe_many_sync(topic:, messages:) ⇒ Object
Sync multi-message pipe.
-
#pipe_sync(topic:, message:) ⇒ Object
Sync version of pipe for one message.
Instance Method Details
#pipe_async(topic:, message:) ⇒ Object
It will NOT deserialize the payload so it is fast
We assume that there can be different number of partitions in the target topic, this is why we use ‘key` based on the source topic key and not the partition id. This will not utilize partitions beyond the number of partitions of source topic, but will accommodate for topics with less partitions.
Pipes given message to the provided topic with expected details. Useful for pass-through operations where deserialization is not needed. Upon usage it will include all the source headers + meta headers about the source of message.
59 60 61 62 63 |
# File 'lib/karafka/pro/processing/piping/consumer.rb', line 59 def pipe_async(topic:, message:) produce_async( (topic: topic, message: ) ) end |
#pipe_many_async(topic:, messages:) ⇒ Object
If transactional producer in use and dispatch is not wrapped with a transaction, it will automatically wrap the dispatch with a transaction
Async multi-message pipe
83 84 85 86 87 88 89 |
# File 'lib/karafka/pro/processing/piping/consumer.rb', line 83 def pipe_many_async(topic:, messages:) = .map do || (topic: topic, message: ) end produce_many_async() end |
#pipe_many_sync(topic:, messages:) ⇒ Object
If transactional producer in use and dispatch is not wrapped with a transaction, it will automatically wrap the dispatch with a transaction
Sync multi-message pipe
98 99 100 101 102 103 104 |
# File 'lib/karafka/pro/processing/piping/consumer.rb', line 98 def pipe_many_sync(topic:, messages:) = .map do || (topic: topic, message: ) end produce_many_sync() end |
#pipe_sync(topic:, message:) ⇒ Object
Sync version of pipe for one message
70 71 72 73 74 |
# File 'lib/karafka/pro/processing/piping/consumer.rb', line 70 def pipe_sync(topic:, message:) produce_sync( (topic: topic, message: ) ) end |