Class: Karafka::Pro::Processing::ConsumerGroups::Coordinator

Inherits:
Karafka::Processing::ConsumerGroups::Coordinator show all
Extended by:
Forwardable
Defined in:
lib/karafka/pro/processing/consumer_groups/coordinator.rb

Overview

Pro coordinator that provides extra orchestration methods useful for parallel processing within the same partition

Instance Attribute Summary collapse

Attributes inherited from Karafka::Processing::ConsumerGroups::Coordinator

#eofed, #last_polled_at, #partition, #pause_tracker, #seek_offset, #topic

Instance Method Summary collapse

Methods inherited from Karafka::Processing::ConsumerGroups::Coordinator

#consumption, #decrement, #eofed?, #failure?, #increment, #manual_pause, #manual_pause?, #manual_seek, #manual_seek?, #marked?, #revoke, #revoked?, #success!, #success?, #synchronize

Constructor Details

#initialize(*args) ⇒ Coordinator

Returns a new instance of Coordinator.

Parameters:

  • args (Object)

    anything the base coordinator accepts



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/karafka/pro/processing/consumer_groups/coordinator.rb', line 51

def initialize(*args)
  super

  @executed = []
  @errors_tracker = errors_tracker_class.new(topic, partition)
  @flow_mutex = Mutex.new
  # Lock for user code synchronization
  # We do not want to mix coordinator lock with the user lock not to create cases where
  # user imposed lock would lock the internal operations of Karafka
  # This shared lock can be used by the end user as it is not used internally by the
  # framework and can be used for user-facing locking
  @shared_mutex = Mutex.new
  @collapser = Collapser.new
  @filter = Coordinators::FiltersApplier.new(self)

  return unless topic.virtual_partitions?

  @virtual_offset_manager = Coordinators::VirtualOffsetManager.new(
    topic.name,
    partition,
    topic.virtual_partitions.
  )

  # We register our own "internal" filter to support filtering of messages that were marked
  # as consumed virtually
  @filter.filters << Filters::VirtualLimiter.new(
    @virtual_offset_manager,
    @collapser
  )
end

Instance Attribute Details

#errors_trackerObject (readonly)

Returns the value of attribute errors_tracker.



48
49
50
# File 'lib/karafka/pro/processing/consumer_groups/coordinator.rb', line 48

def errors_tracker
  @errors_tracker
end

#filterObject (readonly)

Returns the value of attribute filter.



48
49
50
# File 'lib/karafka/pro/processing/consumer_groups/coordinator.rb', line 48

def filter
  @filter
end

#shared_mutexObject (readonly)

Returns the value of attribute shared_mutex.



48
49
50
# File 'lib/karafka/pro/processing/consumer_groups/coordinator.rb', line 48

def shared_mutex
  @shared_mutex
end

#virtual_offset_managerObject (readonly)

Returns the value of attribute virtual_offset_manager.



48
49
50
# File 'lib/karafka/pro/processing/consumer_groups/coordinator.rb', line 48

def virtual_offset_manager
  @virtual_offset_manager
end

Instance Method Details

#active_within?(interval) ⇒ Boolean

Note:

Will return true also if currently active

Returns was this partition in activity within last ‘interval` milliseconds.

Parameters:

  • interval (Integer)

    milliseconds of activity

Returns:

  • (Boolean)

    was this partition in activity within last ‘interval` milliseconds



174
175
176
177
178
179
180
181
# File 'lib/karafka/pro/processing/consumer_groups/coordinator.rb', line 174

def active_within?(interval)
  # its always active if there's any job related to this coordinator that is still
  # enqueued or running
  return true if @running_jobs.values.any?(:positive?)

  # Otherwise we check last time any job of this coordinator was active
  @changed_at + interval > monotonic_now
end

#failure!(consumer, error) ⇒ Object

Sets the consumer failure status and additionally starts the collapse until

Parameters:

  • consumer (Karafka::BaseConsumer)

    consumer that failed

  • error (StandardError)

    error from the failure



113
114
115
116
117
# File 'lib/karafka/pro/processing/consumer_groups/coordinator.rb', line 113

def failure!(consumer, error)
  super
  @errors_tracker << error
  collapse_until!(@last_message.offset + 1)
end

#filtered?Boolean

Returns did any of the filters apply any logic that would cause use to run the filtering flow.

Returns:

  • (Boolean)

    did any of the filters apply any logic that would cause use to run the filtering flow



121
122
123
# File 'lib/karafka/pro/processing/consumer_groups/coordinator.rb', line 121

def filtered?
  @filter.applied?
end

#finished?Boolean

Note:

Used only in the consume operation context

Returns is the coordinated work finished or not.

Returns:

  • (Boolean)

    is the coordinated work finished or not



127
128
129
# File 'lib/karafka/pro/processing/consumer_groups/coordinator.rb', line 127

def finished?
  @running_jobs[:consume].zero?
end

#on_enqueuedObject

Runs synchronized code once for a collective of virtual partitions prior to work being enqueued



133
134
135
136
137
138
139
# File 'lib/karafka/pro/processing/consumer_groups/coordinator.rb', line 133

def on_enqueued
  @flow_mutex.synchronize do
    return unless executable?(:on_enqueued)

    yield(@last_message)
  end
end

#on_finishedObject

Runs given code once when all the work that is suppose to be coordinated is finished It runs once per all the coordinated jobs and should be used to run any type of post jobs coordination processing execution



153
154
155
156
157
158
159
160
# File 'lib/karafka/pro/processing/consumer_groups/coordinator.rb', line 153

def on_finished
  @flow_mutex.synchronize do
    return unless finished?
    return unless executable?(:on_finished)

    yield(@last_message)
  end
end

#on_revokedObject

Runs once after a partition is revoked



163
164
165
166
167
168
169
# File 'lib/karafka/pro/processing/consumer_groups/coordinator.rb', line 163

def on_revoked
  @flow_mutex.synchronize do
    return unless executable?(:on_revoked)

    yield(@last_message)
  end
end

#on_startedObject

Runs given code only once per all the coordinated jobs upon starting first of them



142
143
144
145
146
147
148
# File 'lib/karafka/pro/processing/consumer_groups/coordinator.rb', line 142

def on_started
  @flow_mutex.synchronize do
    return unless executable?(:on_started)

    yield(@last_message)
  end
end

#start(messages) ⇒ Object

Starts the coordination process

Parameters:



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/karafka/pro/processing/consumer_groups/coordinator.rb', line 85

def start(messages)
  super

  @collapser.refresh!(messages.first.offset)

  @filter.apply!(messages)

  # Do not clear coordinator errors storage when we are retrying, so we can reference the
  # errors that have happened during recovery. This can be useful for implementing custom
  # flows. There can be more errors than one when running with virtual partitions so we
  # need to make sure we collect them all. Under collapse when we reference a given
  # consumer we should be able to get all the errors and not just first/last.
  #
  # @note We use zero as the attempt mark because we are not "yet" in the attempt 1
  @errors_tracker.clear if attempt.zero?
  @executed.clear

  # We keep the old processed offsets until the collapsing is done and regular processing
  # with virtualization is restored
  @virtual_offset_manager.clear if topic.virtual_partitions? && !collapsed?

  @last_message = messages.last
end