Module: Karafka::Pro::Processing::ConsumerGroups::Strategies::Vp::Default
- Includes:
- Default
- Included in:
- Aj::DlqMomVp, Aj::FtrLrjMomVp, Aj::LrjMomVp, Aj::MomVp, Dlq::FtrLrjMomVp, Dlq::FtrLrjVp, Dlq::FtrVp, Dlq::LrjVp, Dlq::Vp, Ftr::Vp, Lrj::FtrVp, Lrj::Vp, Mom::Vp
- Defined in:
- lib/karafka/pro/processing/consumer_groups/strategies/vp/default.rb
Overview
Just Virtual Partitions enabled
Constant Summary collapse
- FEATURES =
Features for this strategy
%i[ virtual_partitions ].freeze
Instance Method Summary collapse
- #collapse_until!(offset) ⇒ Object
-
#collapsed? ⇒ Boolean
Is the virtual processing collapsed in the context of given consumer.
-
#failing? ⇒ Boolean
True if any of virtual partition we’re operating in the entangled mode has already failed and we know we are failing collectively.
- #mark_as_consumed(message, offset_metadata = @_current_offset_metadata) ⇒ Object
- #mark_as_consumed!(message, offset_metadata = @_current_offset_metadata) ⇒ Object
-
#mark_in_transaction(message, offset_metadata, async) ⇒ Object
Stores the next offset for processing inside of the transaction when collapsed and accumulates marking as consumed in the local buffer.
-
#synchronize ⇒ Object
Allows for cross-virtual-partition consumers locks.
Methods included from Default
#handle_after_consume, #handle_before_consume, #handle_before_schedule_tick, #handle_consume, #handle_revoked, #handle_tick, #mark_in_memory, #mark_with_transaction, #store_offset_metadata, #transaction
Methods included from Karafka::Processing::ConsumerGroups::Strategies::Default
#commit_offsets, #commit_offsets!, #handle_after_consume, #handle_before_consume, #handle_consume, #handle_eofed, #handle_idle, #handle_initialized, #handle_revoked, #handle_shutdown, #handle_wrap
Methods included from Karafka::Processing::ConsumerGroups::Strategies::Base
#handle_after_consume, #handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown
Instance Method Details
#collapse_until!(offset) ⇒ Object
Keep in mind, that if a batch contains this but also messages earlier messages that should be collapsed, all will continue to operate in a collapsed mode until first full batch with only messages that should not be collapsed.
143 144 145 |
# File 'lib/karafka/pro/processing/consumer_groups/strategies/vp/default.rb', line 143 def collapse_until!(offset) coordinator.collapse_until!(offset) end |
#collapsed? ⇒ Boolean
Returns is the virtual processing collapsed in the context of given consumer.
134 135 136 |
# File 'lib/karafka/pro/processing/consumer_groups/strategies/vp/default.rb', line 134 def collapsed? coordinator.collapsed? end |
#failing? ⇒ Boolean
We’ve named it ‘#failing?` instead of `#failure?` because it aims to be used from within virtual partitions where we want to have notion of collective failing not just “local” to our processing. We “are” failing with other virtual partitions raising an error, but locally we are still processing.
Returns true if any of virtual partition we’re operating in the entangled mode has already failed and we know we are failing collectively. Useful for early stop to minimize number of things processed twice.
155 156 157 |
# File 'lib/karafka/pro/processing/consumer_groups/strategies/vp/default.rb', line 155 def failing? coordinator.failure? end |
#mark_as_consumed(message, offset_metadata = @_current_offset_metadata) ⇒ Object
This virtual offset management uses a regular default marking API underneath. We do not alter the “real” marking API, as VPs are just one of many cases we want to support and we do not want to impact them with collective offsets management
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/karafka/pro/processing/consumer_groups/strategies/vp/default.rb', line 57 def mark_as_consumed(, = @_current_offset_metadata) if @_in_transaction && !collapsed? mark_in_transaction(, , true) elsif collapsed? super else manager = coordinator.virtual_offset_manager coordinator.synchronize do manager.mark(, ) # If this is last marking on a finished flow, we can use the original # last message and in order to do so, we need to mark all previous messages as # consumed as otherwise the computed offset could be different # We mark until our offset just in case of a DLQ flow or similar, where we do not # want to mark all but until the expected location manager.mark_until(, ) if coordinator.finished? return revoked? unless manager.markable? manager.markable? ? super(*manager.markable) : revoked? end end ensure @_current_offset_metadata = nil end |
#mark_as_consumed!(message, offset_metadata = @_current_offset_metadata) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/karafka/pro/processing/consumer_groups/strategies/vp/default.rb', line 85 def mark_as_consumed!(, = @_current_offset_metadata) if @_in_transaction && !collapsed? mark_in_transaction(, , false) elsif collapsed? super else manager = coordinator.virtual_offset_manager coordinator.synchronize do manager.mark(, ) manager.mark_until(, ) if coordinator.finished? manager.markable? ? super(*manager.markable) : revoked? end end ensure @_current_offset_metadata = nil end |
#mark_in_transaction(message, offset_metadata, async) ⇒ Object
Stores the next offset for processing inside of the transaction when collapsed and accumulates marking as consumed in the local buffer.
Due to nature of VPs we cannot provide full EOS support but we can simulate it, making sure that no offset are stored unless transaction is finished. We do it by accumulating the post-transaction marking requests and after it is successfully done we mark each as consumed. This effectively on errors “rollbacks” the state and prevents offset storage.
Since the EOS here is “weak”, we do not have to worry about the race-conditions and we do not have to have any mutexes.
119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/karafka/pro/processing/consumer_groups/strategies/vp/default.rb', line 119 def mark_in_transaction(, , async) raise Errors::TransactionRequiredError unless @_in_transaction # Prevent from attempts of offset storage when we no longer own the assignment raise Errors::AssignmentLostError if revoked? return super if collapsed? # If this is user post-execution transaction (one initiated by the system) we should # delegate to the original implementation that will store the offset via the producer return super if @_transaction_internal @_transaction_marked << [, , async] end |
#synchronize ⇒ Object
Allows for cross-virtual-partition consumers locks
This is not needed in the non-VP flows except LRJ because there is always only one consumer per partition at the same time, so no coordination is needed directly for the end users. With LRJ it is needed and provided in the ‘LRJ::Default` strategy, because lifecycle events on revocation can run in parallel to the LRJ job as it is non-blocking.
166 167 168 |
# File 'lib/karafka/pro/processing/consumer_groups/strategies/vp/default.rb', line 166 def synchronize(&) coordinator.shared_mutex.synchronize(&) end |