Class: Karafka::Processing::Coordinator
- Inherits:
-
Object
- Object
- Karafka::Processing::Coordinator
- Extended by:
- Forwardable
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/processing/coordinator.rb
Overview
This coordinator needs to be thread safe. Some operations are performed only in the listener thread, but we go with thread-safe by default for all not to worry about potential future mistakes.
Basic coordinator that allows us to provide coordination objects into consumers.
This is a wrapping layer to simplify management of work to be handled around consumption.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#partition ⇒ Object
readonly
Returns the value of attribute partition.
-
#pause_tracker ⇒ Object
readonly
Returns the value of attribute pause_tracker.
-
#seek_offset ⇒ Object
Returns the value of attribute seek_offset.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
-
#consumption(consumer) ⇒ Karafka::Processing::Result
Result object which we can use to indicate consumption processing state.
-
#decrement(job_type) ⇒ Object
Decrements number of jobs we handle at the moment.
-
#failure!(consumer, error) ⇒ Object
Mark given consumption on consumer as failed.
-
#failure? ⇒ Boolean
True if any of work we were running failed.
-
#increment(job_type) ⇒ Object
Increases number of jobs that we handle with this coordinator.
-
#initialize(topic, partition, pause_tracker) ⇒ Coordinator
constructor
A new instance of Coordinator.
-
#manual_pause ⇒ Object
Store in the coordinator info, that this pause was done manually by the end user and not by the system itself.
-
#manual_pause? ⇒ Boolean
Are we in a pause that was initiated by the user.
-
#manual_seek ⇒ Object
Marks seek as manual for coordination purposes.
-
#manual_seek? ⇒ Boolean
Did a user invoke seek in the current operations scope.
-
#marked? ⇒ Boolean
Was the new seek offset assigned at least once.
-
#revoke ⇒ Object
Marks given coordinator for processing group as revoked.
-
#revoked? ⇒ Boolean
Is the partition we are processing revoked or not.
-
#start(messages) ⇒ Object
Starts the coordinator for given consumption jobs.
-
#success!(consumer) ⇒ Object
Mark given consumption on consumer as successful.
-
#success? ⇒ Boolean
Is all the consumption done and finished successfully for this coordinator We do not say we’re successful until all work is done, because running work may still crash.
-
#synchronize(&block) ⇒ Object
Allows to run synchronized (locked) code that can operate only from a given thread.
Constructor Details
#initialize(topic, partition, pause_tracker) ⇒ Coordinator
Returns a new instance of Coordinator.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/karafka/processing/coordinator.rb', line 23 def initialize(topic, partition, pause_tracker) @topic = topic @partition = partition @pause_tracker = pause_tracker @revoked = false @consumptions = {} @running_jobs = Hash.new { |h, k| h[k] = 0 } @manual_pause = false @manual_seek = false @mutex = Mutex.new @marked = false @failure = false @changed_at = monotonic_now end |
Instance Attribute Details
#partition ⇒ Object (readonly)
Returns the value of attribute partition.
16 17 18 |
# File 'lib/karafka/processing/coordinator.rb', line 16 def partition @partition end |
#pause_tracker ⇒ Object (readonly)
Returns the value of attribute pause_tracker.
16 17 18 |
# File 'lib/karafka/processing/coordinator.rb', line 16 def pause_tracker @pause_tracker end |
#seek_offset ⇒ Object
Returns the value of attribute seek_offset.
16 17 18 |
# File 'lib/karafka/processing/coordinator.rb', line 16 def seek_offset @seek_offset end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
16 17 18 |
# File 'lib/karafka/processing/coordinator.rb', line 16 def topic @topic end |
Instance Method Details
#consumption(consumer) ⇒ Karafka::Processing::Result
Returns result object which we can use to indicate consumption processing state.
181 182 183 |
# File 'lib/karafka/processing/coordinator.rb', line 181 def consumption(consumer) @consumptions[consumer] ||= Processing::Result.new end |
#decrement(job_type) ⇒ Object
Decrements number of jobs we handle at the moment
85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/karafka/processing/coordinator.rb', line 85 def decrement(job_type) synchronize do @running_jobs[job_type] -= 1 @changed_at = monotonic_now return @running_jobs[job_type] unless @running_jobs[job_type].negative? # This should never happen. If it does, something is heavily out of sync. Please reach # out to us if you encounter this raise Karafka::Errors::InvalidCoordinatorStateError, 'Was zero before decrementation' end end |
#failure!(consumer, error) ⇒ Object
Mark given consumption on consumer as failed
119 120 121 122 123 124 |
# File 'lib/karafka/processing/coordinator.rb', line 119 def failure!(consumer, error) synchronize do @failure = true consumption(consumer).failure!(error) end end |
#failure? ⇒ Boolean
Returns true if any of work we were running failed.
127 128 129 |
# File 'lib/karafka/processing/coordinator.rb', line 127 def failure? @failure end |
#increment(job_type) ⇒ Object
Increases number of jobs that we handle with this coordinator
76 77 78 79 80 81 |
# File 'lib/karafka/processing/coordinator.rb', line 76 def increment(job_type) synchronize do @running_jobs[job_type] += 1 @changed_at = monotonic_now end end |
#manual_pause ⇒ Object
Store in the coordinator info, that this pause was done manually by the end user and not by the system itself
159 160 161 |
# File 'lib/karafka/processing/coordinator.rb', line 159 def manual_pause @manual_pause = true end |
#manual_pause? ⇒ Boolean
Returns are we in a pause that was initiated by the user.
164 165 166 |
# File 'lib/karafka/processing/coordinator.rb', line 164 def manual_pause? paused? && @manual_pause end |
#manual_seek ⇒ Object
Marks seek as manual for coordination purposes
169 170 171 |
# File 'lib/karafka/processing/coordinator.rb', line 169 def manual_seek @manual_seek = true end |
#manual_seek? ⇒ Boolean
Returns did a user invoke seek in the current operations scope.
174 175 176 |
# File 'lib/karafka/processing/coordinator.rb', line 174 def manual_seek? @manual_seek end |
#marked? ⇒ Boolean
Returns was the new seek offset assigned at least once. This is needed because by default we assign seek offset of a first message ever, however this is insufficient for DLQ in a scenario where the first message would be broken. We would never move out of it and would end up in an endless loop.
153 154 155 |
# File 'lib/karafka/processing/coordinator.rb', line 153 def marked? @marked end |
#revoke ⇒ Object
Marks given coordinator for processing group as revoked
This is invoked in two places:
- from the main listener loop when we detect revoked partitions
- from the consumer in case checkpointing fails
This means, we can end up having consumer being aware that it was revoked prior to the listener loop dispatching the revocation job. It is ok, as effectively nothing will be processed until revocation jobs are done.
140 141 142 |
# File 'lib/karafka/processing/coordinator.rb', line 140 def revoke synchronize { @revoked = true } end |
#revoked? ⇒ Boolean
Returns is the partition we are processing revoked or not.
145 146 147 |
# File 'lib/karafka/processing/coordinator.rb', line 145 def revoked? @revoked end |
#start(messages) ⇒ Object
Starts the coordinator for given consumption jobs
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/karafka/processing/coordinator.rb', line 41 def start() @failure = false @running_jobs[:consume] = 0 # We need to clear the consumption results hash here, otherwise we could end up storing # consumption results of consumer instances we no longer control @consumptions.clear # When starting to run, no pause is expected and no manual pause as well @manual_pause = false # No user invoked seeks on a new run @manual_seek = false # We set it on the first encounter and never again, because then the offset setting # should be up to the consumers logic (our or the end user) # Seek offset needs to be always initialized as for case where manual offset management # is turned on, we need to have reference to the first offset even in case of running # multiple batches without marking any messages as consumed. Rollback needs to happen to # the last place we know of or the last message + 1 that was marked # # It is however worth keeping in mind, that this may need to be used with `#marked?` to # make sure that the first offset is an offset that has been marked. @seek_offset ||= .first.offset end |
#success!(consumer) ⇒ Object
Mark given consumption on consumer as successful
110 111 112 113 114 |
# File 'lib/karafka/processing/coordinator.rb', line 110 def success!(consumer) synchronize do consumption(consumer).success! end end |
#success? ⇒ Boolean
This is only used for consume synchronization
Is all the consumption done and finished successfully for this coordinator We do not say we’re successful until all work is done, because running work may still crash.
102 103 104 105 106 |
# File 'lib/karafka/processing/coordinator.rb', line 102 def success? synchronize do @running_jobs[:consume].zero? && @consumptions.values.all?(&:success?) end end |
#synchronize(&block) ⇒ Object
We check if mutex is not owned already by the current thread so we won’t end up with a deadlock in case user runs coordinated code from inside of his own lock
This is internal and should not be used to synchronize user-facing code. Otherwise user indirectly could cause deadlocks or prolonged locks by running his logic. This can and should however be used for multi-thread strategy applications and other internal operations locks.
Allows to run synchronized (locked) code that can operate only from a given thread
196 197 198 199 200 201 202 |
# File 'lib/karafka/processing/coordinator.rb', line 196 def synchronize(&block) if @mutex.owned? yield else @mutex.synchronize(&block) end end |