Module: Karafka::Processing::Strategies::Default
- Includes:
- Base
- Included in:
- Karafka::Pro::Processing::Strategies::Default, Dlq, Mom
- Defined in:
- lib/karafka/processing/strategies/default.rb
Overview
No features enabled. No manual offset management No long running jobs Nothing. Just standard, automatic flow
Constant Summary collapse
- FEATURES =
Apply strategy for a non-feature based flow
%i[].freeze
Instance Method Summary collapse
-
#commit_offsets(async: true) ⇒ Boolean
Triggers an async offset commit.
-
#commit_offsets! ⇒ Boolean
Triggers a synchronous offsets commit to Kafka.
-
#handle_after_consume ⇒ Object
Standard flow marks work as consumed and moves on if everything went ok.
-
#handle_before_consume ⇒ Object
Increment number of attempts.
-
#handle_consume ⇒ Object
Run the user consumption code.
-
#handle_idle ⇒ Object
Code that should run on idle runs without messages available.
-
#handle_revoked ⇒ Object
We need to always un-pause the processing in case we have lost a given partition.
-
#handle_shutdown ⇒ Object
Runs the shutdown code.
-
#mark_as_consumed(message) ⇒ Boolean
Marks message as consumed in an async way.
-
#mark_as_consumed!(message) ⇒ Boolean
Marks message as consumed in a sync way.
Instance Method Details
#commit_offsets(async: true) ⇒ Boolean
Due to its async nature, this may not fully represent the offset state in some edge cases (like for example going beyond max.poll.interval)
Triggers an async offset commit
83 84 85 86 87 88 89 90 91 |
# File 'lib/karafka/processing/strategies/default.rb', line 83 def commit_offsets(async: true) # Do not commit if we already lost the assignment return false if revoked? return true if client.commit_offsets(async: async) # This will once more check the librdkafka revocation status and will revoke the # coordinator in case it was not revoked revoked? end |
#commit_offsets! ⇒ Boolean
This is fully synchronous, hence the result of this can be used in DB transactions etc as a way of making sure, that we still own the partition.
Triggers a synchronous offsets commit to Kafka
98 99 100 |
# File 'lib/karafka/processing/strategies/default.rb', line 98 def commit_offsets! commit_offsets(async: false) end |
#handle_after_consume ⇒ Object
Standard flow marks work as consumed and moves on if everything went ok. If there was a processing error, we will pause and continue from the next message (next that is +1 from the last one that was successfully marked as consumed)
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/karafka/processing/strategies/default.rb', line 129 def handle_after_consume return if revoked? if coordinator.success? coordinator.pause_tracker.reset # We should not move the offset automatically when the partition was paused # If we would not do this upon a revocation during the pause time, a different process # would pick not from the place where we paused but from the offset that would be # automatically committed here return if coordinator.manual_pause? mark_as_consumed(.last) else retry_after_pause end end |
#handle_before_consume ⇒ Object
Increment number of attempts
103 104 105 |
# File 'lib/karafka/processing/strategies/default.rb', line 103 def handle_before_consume coordinator.pause_tracker.increment end |
#handle_consume ⇒ Object
Run the user consumption code
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/karafka/processing/strategies/default.rb', line 108 def handle_consume Karafka.monitor.instrument('consumer.consume', caller: self) Karafka.monitor.instrument('consumer.consumed', caller: self) do consume end # Mark job as successful coordinator.success!(self) rescue StandardError => e coordinator.failure!(self, e) # Re-raise so reported in the consumer raise e ensure # We need to decrease number of jobs that this coordinator coordinates as it has finished coordinator.decrement(:consume) end |
#handle_idle ⇒ Object
Code that should run on idle runs without messages available
148 149 150 151 152 |
# File 'lib/karafka/processing/strategies/default.rb', line 148 def handle_idle nil ensure coordinator.decrement(:idle) end |
#handle_revoked ⇒ Object
We need to always un-pause the processing in case we have lost a given partition. Otherwise the underlying librdkafka would not know we may want to continue processing and the pause could in theory last forever
157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/karafka/processing/strategies/default.rb', line 157 def handle_revoked resume coordinator.revoke Karafka.monitor.instrument('consumer.revoke', caller: self) Karafka.monitor.instrument('consumer.revoked', caller: self) do revoked end ensure coordinator.decrement(:revoked) end |
#handle_shutdown ⇒ Object
Runs the shutdown code
171 172 173 174 175 176 177 178 |
# File 'lib/karafka/processing/strategies/default.rb', line 171 def handle_shutdown Karafka.monitor.instrument('consumer.shutting_down', caller: self) Karafka.monitor.instrument('consumer.shutdown', caller: self) do shutdown end ensure coordinator.decrement(:shutdown) end |
#mark_as_consumed(message) ⇒ Boolean
We keep track of this offset in case we would mark as consumed and got error when processing another message. In case like this we do not pause on the message we’ve already processed but rather at the next one. This applies to both sync and async versions of this method.
Marks message as consumed in an async way.
43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/karafka/processing/strategies/default.rb', line 43 def mark_as_consumed() # seek offset can be nil only in case `#seek` was invoked with offset reset request # In case like this we ignore marking return true if coordinator.seek_offset.nil? # Ignore earlier offsets than the one we already committed return true if coordinator.seek_offset > .offset return false if revoked? return revoked? unless client.mark_as_consumed() coordinator.seek_offset = .offset + 1 true end |
#mark_as_consumed!(message) ⇒ Boolean
Marks message as consumed in a sync way.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/karafka/processing/strategies/default.rb', line 62 def mark_as_consumed!() # seek offset can be nil only in case `#seek` was invoked with offset reset request # In case like this we ignore marking return true if coordinator.seek_offset.nil? # Ignore earlier offsets than the one we already committed return true if coordinator.seek_offset > .offset return false if revoked? return revoked? unless client.mark_as_consumed!() coordinator.seek_offset = .offset + 1 true end |