Module: Karafka::Pro::Processing::Piping::Consumer

Defined in:
lib/karafka/pro/processing/piping/consumer.rb

Overview

Consumer piping functionality

It provides way to pipe data in a consistent way with extra traceability headers similar to those in the enhanced DLQ.

Instance Method Summary collapse

Instance Method Details

#pipe_async(topic:, message:) ⇒ Object

Note:

It will NOT deserialize the payload so it is fast

Note:

We assume that there can be different number of partitions in the target topic, this is why we use ‘key` based on the source topic key and not the partition id. This will not utilize partitions beyond the number of partitions of source topic, but will accommodate for topics with less partitions.

Pipes given message to the provided topic with expected details. Useful for pass-through operations where deserialization is not needed. Upon usage it will include all the source headers + meta headers about the source of message.

Parameters:



59
60
61
62
63
# File 'lib/karafka/pro/processing/piping/consumer.rb', line 59

def pipe_async(topic:, message:)
  produce_async(
    build_pipe_message(topic: topic, message: message)
  )
end

#pipe_many_async(topic:, messages:) ⇒ Object

Note:

If transactional producer in use and dispatch is not wrapped with a transaction, it will automatically wrap the dispatch with a transaction

Async multi-message pipe

Parameters:

  • topic (String, Symbol)

    where we want to send the message

  • messages (Array<Karafka::Messages::Message>)

    source messages to pipe



83
84
85
86
87
88
89
# File 'lib/karafka/pro/processing/piping/consumer.rb', line 83

def pipe_many_async(topic:, messages:)
  messages = messages.map do |message|
    build_pipe_message(topic: topic, message: message)
  end

  produce_many_async(messages)
end

#pipe_many_sync(topic:, messages:) ⇒ Object

Note:

If transactional producer in use and dispatch is not wrapped with a transaction, it will automatically wrap the dispatch with a transaction

Sync multi-message pipe

Parameters:

  • topic (String, Symbol)

    where we want to send the message

  • messages (Array<Karafka::Messages::Message>)

    source messages to pipe



98
99
100
101
102
103
104
# File 'lib/karafka/pro/processing/piping/consumer.rb', line 98

def pipe_many_sync(topic:, messages:)
  messages = messages.map do |message|
    build_pipe_message(topic: topic, message: message)
  end

  produce_many_sync(messages)
end

#pipe_sync(topic:, message:) ⇒ Object

Sync version of pipe for one message

Parameters:

See Also:

  • Karafka::Pro::Processing::Piping::Consumer.[[#pipe_async]


70
71
72
73
74
# File 'lib/karafka/pro/processing/piping/consumer.rb', line 70

def pipe_sync(topic:, message:)
  produce_sync(
    build_pipe_message(topic: topic, message: message)
  )
end