Class: Karafka::Pro::Processing::Coordinator
- Inherits:
-
Karafka::Processing::Coordinator
- Object
- Karafka::Processing::Coordinator
- Karafka::Pro::Processing::Coordinator
- Extended by:
- Forwardable
- Defined in:
- lib/karafka/pro/processing/coordinator.rb
Overview
Pro coordinator that provides extra orchestration methods useful for parallel processing within the same partition
Instance Attribute Summary collapse
-
#errors_tracker ⇒ Object
readonly
Returns the value of attribute errors_tracker.
-
#filter ⇒ Object
readonly
Returns the value of attribute filter.
-
#shared_mutex ⇒ Object
readonly
Returns the value of attribute shared_mutex.
-
#virtual_offset_manager ⇒ Object
readonly
Returns the value of attribute virtual_offset_manager.
Attributes inherited from Karafka::Processing::Coordinator
#eofed, #last_polled_at, #partition, #pause_tracker, #seek_offset, #topic
Instance Method Summary collapse
-
#active_within?(interval) ⇒ Boolean
Was this partition in activity within last ‘interval` milliseconds.
-
#failure!(consumer, error) ⇒ Object
Sets the consumer failure status and additionally starts the collapse until.
-
#filtered? ⇒ Boolean
Did any of the filters apply any logic that would cause use to run the filtering flow.
-
#finished? ⇒ Boolean
Is the coordinated work finished or not.
-
#initialize(*args) ⇒ Coordinator
constructor
A new instance of Coordinator.
-
#on_enqueued ⇒ Object
Runs synchronized code once for a collective of virtual partitions prior to work being enqueued.
-
#on_finished ⇒ Object
Runs given code once when all the work that is suppose to be coordinated is finished It runs once per all the coordinated jobs and should be used to run any type of post jobs coordination processing execution.
-
#on_revoked ⇒ Object
Runs once after a partition is revoked.
-
#on_started ⇒ Object
Runs given code only once per all the coordinated jobs upon starting first of them.
-
#start(messages) ⇒ Object
Starts the coordination process.
Methods inherited from Karafka::Processing::Coordinator
#consumption, #decrement, #eofed?, #failure?, #increment, #manual_pause, #manual_pause?, #manual_seek, #manual_seek?, #marked?, #revoke, #revoked?, #success!, #success?, #synchronize
Constructor Details
#initialize(*args) ⇒ Coordinator
Returns a new instance of Coordinator.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 47 def initialize(*args) super @executed = [] @errors_tracker = errors_tracker_class.new(topic, partition) @flow_mutex = Mutex.new # Lock for user code synchronization # We do not want to mix coordinator lock with the user lock not to create cases where # user imposed lock would lock the internal operations of Karafka # This shared lock can be used by the end user as it is not used internally by the # framework and can be used for user-facing locking @shared_mutex = Mutex.new @collapser = Collapser.new @filter = Coordinators::FiltersApplier.new(self) return unless topic.virtual_partitions? @virtual_offset_manager = Coordinators::VirtualOffsetManager.new( topic.name, partition, topic.virtual_partitions. ) # We register our own "internal" filter to support filtering of messages that were marked # as consumed virtually @filter.filters << Filters::VirtualLimiter.new( @virtual_offset_manager, @collapser ) end |
Instance Attribute Details
#errors_tracker ⇒ Object (readonly)
Returns the value of attribute errors_tracker.
44 45 46 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 44 def errors_tracker @errors_tracker end |
#filter ⇒ Object (readonly)
Returns the value of attribute filter.
44 45 46 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 44 def filter @filter end |
#shared_mutex ⇒ Object (readonly)
Returns the value of attribute shared_mutex.
44 45 46 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 44 def shared_mutex @shared_mutex end |
#virtual_offset_manager ⇒ Object (readonly)
Returns the value of attribute virtual_offset_manager.
44 45 46 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 44 def virtual_offset_manager @virtual_offset_manager end |
Instance Method Details
#active_within?(interval) ⇒ Boolean
Will return true also if currently active
Returns was this partition in activity within last ‘interval` milliseconds.
170 171 172 173 174 175 176 177 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 170 def active_within?(interval) # its always active if there's any job related to this coordinator that is still # enqueued or running return true if @running_jobs.values.any?(:positive?) # Otherwise we check last time any job of this coordinator was active @changed_at + interval > monotonic_now end |
#failure!(consumer, error) ⇒ Object
Sets the consumer failure status and additionally starts the collapse until
109 110 111 112 113 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 109 def failure!(consumer, error) super @errors_tracker << error collapse_until!(@last_message.offset + 1) end |
#filtered? ⇒ Boolean
Returns did any of the filters apply any logic that would cause use to run the filtering flow.
117 118 119 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 117 def filtered? @filter.applied? end |
#finished? ⇒ Boolean
Used only in the consume operation context
Returns is the coordinated work finished or not.
123 124 125 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 123 def finished? @running_jobs[:consume].zero? end |
#on_enqueued ⇒ Object
Runs synchronized code once for a collective of virtual partitions prior to work being enqueued
129 130 131 132 133 134 135 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 129 def on_enqueued @flow_mutex.synchronize do return unless executable?(:on_enqueued) yield(@last_message) end end |
#on_finished ⇒ Object
Runs given code once when all the work that is suppose to be coordinated is finished It runs once per all the coordinated jobs and should be used to run any type of post jobs coordination processing execution
149 150 151 152 153 154 155 156 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 149 def on_finished @flow_mutex.synchronize do return unless finished? return unless executable?(:on_finished) yield(@last_message) end end |
#on_revoked ⇒ Object
Runs once after a partition is revoked
159 160 161 162 163 164 165 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 159 def on_revoked @flow_mutex.synchronize do return unless executable?(:on_revoked) yield(@last_message) end end |
#on_started ⇒ Object
Runs given code only once per all the coordinated jobs upon starting first of them
138 139 140 141 142 143 144 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 138 def on_started @flow_mutex.synchronize do return unless executable?(:on_started) yield(@last_message) end end |
#start(messages) ⇒ Object
Starts the coordination process
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 81 def start() super @collapser.refresh!(.first.offset) @filter.apply!() # Do not clear coordinator errors storage when we are retrying, so we can reference the # errors that have happened during recovery. This can be useful for implementing custom # flows. There can be more errors than one when running with virtual partitions so we # need to make sure we collect them all. Under collapse when we reference a given # consumer we should be able to get all the errors and not just first/last. # # @note We use zero as the attempt mark because we are not "yet" in the attempt 1 @errors_tracker.clear if attempt.zero? @executed.clear # We keep the old processed offsets until the collapsing is done and regular processing # with virtualization is restored @virtual_offset_manager.clear if topic.virtual_partitions? && !collapsed? @last_message = .last end |