Class: Karafka::Pro::Processing::Partitioner
- Inherits:
-
Karafka::Processing::Partitioner
- Object
- Karafka::Processing::Partitioner
- Karafka::Pro::Processing::Partitioner
- Defined in:
- lib/karafka/pro/processing/partitioner.rb
Overview
Pro partitioner that can distribute work based on the virtual partitioner settings
Instance Method Summary collapse
Methods inherited from Karafka::Processing::Partitioner
Constructor Details
This class inherits a constructor from Karafka::Processing::Partitioner
Instance Method Details
#call(topic, messages, coordinator) {|group, karafka| ... } ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/karafka/pro/processing/partitioner.rb', line 25 def call(topic, , coordinator) ktopic = @subscription_group.topics.find(topic) vps = ktopic.virtual_partitions # We only partition work if we have: # - a virtual partitioner # - more than one thread to process the data # - collective is not collapsed via coordinator # # With one thread it is not worth partitioning the work as the work itself will be # assigned to one thread (pointless work) # # We collapse the partitioning on errors because we "regain" full ordering on a batch # that potentially contains the data that caused the error. # # This is great because it allows us to run things without the parallelization that adds # a bit of uncertainty and allows us to use DLQ and safely skip messages if needed. if vps.active? && vps.max_partitions > 1 && !coordinator.collapsed? groupings = .group_by do |msg| # We need to reduce it to the max concurrency, so the group_id is not a direct effect # of the end user action. Otherwise the persistence layer for consumers would cache # it forever and it would cause memory leaks # # This also needs to be consistent because the aggregation here needs to warrant, # that the same partitioned message will always be assigned to the same virtual # partition. Otherwise in case of a window aggregation with VP spanning across # several polls, the data could not be complete. vps.reducer.call( vps.partitioner.call(msg) ) end groupings.each do |key, | yield(key, ) end else # When no virtual partitioner, works as regular one yield(0, ) end end |