Class: Karafka::Processing::ConsumerGroups::Coordinator
- Inherits:
-
Object
- Object
- Karafka::Processing::ConsumerGroups::Coordinator
- Extended by:
- Forwardable
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/processing/consumer_groups/coordinator.rb
Overview
This coordinator needs to be thread safe. Some operations are performed only in the listener thread, but we go with thread-safe by default for all not to worry about potential future mistakes.
Basic coordinator that allows us to provide coordination objects into consumers.
This is a wrapping layer to simplify management of work to be handled around consumption.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#eofed ⇒ Object
This can be set directly on the listener because it can be triggered on first run without any messages.
-
#last_polled_at ⇒ Object
Last polled at time set based on the incoming last poll time.
-
#partition ⇒ Object
readonly
Returns the value of attribute partition.
-
#pause_tracker ⇒ Object
readonly
Returns the value of attribute pause_tracker.
-
#seek_offset ⇒ Object
Returns the value of attribute seek_offset.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
-
#consumption(consumer) ⇒ Karafka::Processing::Result
Result object which we can use to indicate consumption processing state.
-
#decrement(job_type) ⇒ Object
Decrements number of jobs we handle at the moment.
-
#eofed? ⇒ Boolean
Did we reach end of partition when polling data.
-
#failure!(consumer, error) ⇒ Object
Mark given consumption on consumer as failed.
-
#failure? ⇒ Boolean
True if any of work we were running failed.
-
#increment(job_type) ⇒ Object
Increases number of jobs that we handle with this coordinator.
-
#initialize(topic, partition, pause_tracker) ⇒ Coordinator
constructor
A new instance of Coordinator.
-
#manual_pause ⇒ Object
Store in the coordinator info, that this pause was done manually by the end user and not by the system itself.
-
#manual_pause? ⇒ Boolean
Are we in a pause that was initiated by the user.
-
#manual_seek ⇒ Object
Marks seek as manual for coordination purposes.
-
#manual_seek? ⇒ Boolean
Did a user invoke seek in the current operations scope.
-
#marked? ⇒ Boolean
Was the new seek offset assigned at least once.
-
#revoke ⇒ Object
Marks given coordinator for processing group as revoked.
-
#revoked? ⇒ Boolean
Is the partition we are processing revoked or not.
-
#start(messages) ⇒ Object
Starts the coordinator for given consumption jobs.
-
#success!(consumer) ⇒ Object
Mark given consumption on consumer as successful.
-
#success? ⇒ Boolean
Is all the consumption done and finished successfully for this coordinator We do not say we’re successful until all work is done, because running work may still crash.
-
#synchronize ⇒ Object
Allows to run synchronized (locked) code that can operate only from a given thread.
Constructor Details
#initialize(topic, partition, pause_tracker) ⇒ Coordinator
Returns a new instance of Coordinator.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 33 def initialize(topic, partition, pause_tracker) @topic = topic @partition = partition @pause_tracker = pause_tracker @revoked = false @consumptions = {} @running_jobs = Hash.new { |h, k| h[k] = 0 } @manual_pause = false @manual_seek = false @mutex = Mutex.new @marked = false @failure = false @eofed = false @changed_at = monotonic_now @last_polled_at = @changed_at end |
Instance Attribute Details
#eofed ⇒ Object
This can be set directly on the listener because it can be triggered on first run without any messages
23 24 25 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 23 def eofed @eofed end |
#last_polled_at ⇒ Object
Last polled at time set based on the incoming last poll time
26 27 28 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 26 def last_polled_at @last_polled_at end |
#partition ⇒ Object (readonly)
Returns the value of attribute partition.
19 20 21 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 19 def partition @partition end |
#pause_tracker ⇒ Object (readonly)
Returns the value of attribute pause_tracker.
19 20 21 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 19 def pause_tracker @pause_tracker end |
#seek_offset ⇒ Object
Returns the value of attribute seek_offset.
19 20 21 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 19 def seek_offset @seek_offset end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
19 20 21 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 19 def topic @topic end |
Instance Method Details
#consumption(consumer) ⇒ Karafka::Processing::Result
Returns result object which we can use to indicate consumption processing state.
198 199 200 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 198 def consumption(consumer) @consumptions[consumer] ||= Processing::Result.new end |
#decrement(job_type) ⇒ Object
Decrements number of jobs we handle at the moment
97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 97 def decrement(job_type) synchronize do @running_jobs[job_type] -= 1 @changed_at = monotonic_now return @running_jobs[job_type] unless @running_jobs[job_type].negative? # This should never happen. If it does, something is heavily out of sync. Please reach # out to us if you encounter this raise Karafka::Errors::InvalidCoordinatorStateError, "Was zero before decrementation" end end |
#eofed? ⇒ Boolean
Returns did we reach end of partition when polling data.
162 163 164 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 162 def eofed? @eofed end |
#failure!(consumer, error) ⇒ Object
Mark given consumption on consumer as failed
131 132 133 134 135 136 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 131 def failure!(consumer, error) synchronize do @failure = true consumption(consumer).failure!(error) end end |
#failure? ⇒ Boolean
Returns true if any of work we were running failed.
139 140 141 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 139 def failure? @failure end |
#increment(job_type) ⇒ Object
Increases number of jobs that we handle with this coordinator
88 89 90 91 92 93 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 88 def increment(job_type) synchronize do @running_jobs[job_type] += 1 @changed_at = monotonic_now end end |
#manual_pause ⇒ Object
Store in the coordinator info, that this pause was done manually by the end user and not by the system itself
176 177 178 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 176 def manual_pause @manual_pause = true end |
#manual_pause? ⇒ Boolean
Returns are we in a pause that was initiated by the user.
181 182 183 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 181 def manual_pause? paused? && @manual_pause end |
#manual_seek ⇒ Object
Marks seek as manual for coordination purposes
186 187 188 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 186 def manual_seek @manual_seek = true end |
#manual_seek? ⇒ Boolean
Returns did a user invoke seek in the current operations scope.
191 192 193 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 191 def manual_seek? @manual_seek end |
#marked? ⇒ Boolean
Returns was the new seek offset assigned at least once. This is needed because by default we assign seek offset of a first message ever, however this is insufficient for DLQ in a scenario where the first message would be broken. We would never move out of it and would end up in an endless loop.
170 171 172 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 170 def marked? @marked end |
#revoke ⇒ Object
Marks given coordinator for processing group as revoked
This is invoked in two places:
- from the main listener loop when we detect revoked partitions
- from the consumer in case checkpointing fails
This means, we can end up having consumer being aware that it was revoked prior to the listener loop dispatching the revocation job. It is ok, as effectively nothing will be processed until revocation jobs are done.
152 153 154 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 152 def revoke synchronize { @revoked = true } end |
#revoked? ⇒ Boolean
Returns is the partition we are processing revoked or not.
157 158 159 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 157 def revoked? @revoked end |
#start(messages) ⇒ Object
Starts the coordinator for given consumption jobs
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 53 def start() @failure = false @running_jobs[:consume] = 0 # We need to clear the consumption results hash here, otherwise we could end up storing # consumption results of consumer instances we no longer control @consumptions.clear # When starting to run, no pause is expected and no manual pause as well @manual_pause = false # No user invoked seeks on a new run @manual_seek = false # We set it on the first encounter and never again, because then the offset setting # should be up to the consumers logic (our or the end user) # Seek offset needs to be always initialized as for case where manual offset management # is turned on, we need to have reference to the first offset even in case of running # multiple batches without marking any messages as consumed. Rollback needs to happen to # the last place we know of or the last message + 1 that was marked # # It is however worth keeping in mind, that this may need to be used with `#marked?` to # make sure that the first offset is an offset that has been marked. @seek_offset ||= .first.offset end |
#success!(consumer) ⇒ Object
Mark given consumption on consumer as successful
122 123 124 125 126 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 122 def success!(consumer) synchronize do consumption(consumer).success! end end |
#success? ⇒ Boolean
This is only used for consume synchronization
Is all the consumption done and finished successfully for this coordinator We do not say we’re successful until all work is done, because running work may still crash.
114 115 116 117 118 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 114 def success? synchronize do @running_jobs[:consume].zero? && @consumptions.values.all?(&:success?) end end |
#synchronize ⇒ Object
We check if mutex is not owned already by the current thread so we won’t end up with a deadlock in case user runs coordinated code from inside of his own lock
This is internal and should not be used to synchronize user-facing code. Otherwise user indirectly could cause deadlocks or prolonged locks by running his logic. This can and should however be used for multi-thread strategy applications and other internal operations locks.
Allows to run synchronized (locked) code that can operate only from a given thread
211 212 213 214 215 216 217 |
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 211 def synchronize(&) if @mutex.owned? yield else @mutex.synchronize(&) end end |