Class: Karafka::Processing::ConsumerGroups::Coordinator

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/processing/consumer_groups/coordinator.rb

Overview

Note:

This coordinator needs to be thread safe. Some operations are performed only in the listener thread, but we go with thread-safe by default for all not to worry about potential future mistakes.

Basic coordinator that allows us to provide coordination objects into consumers.

This is a wrapping layer to simplify management of work to be handled around consumption.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, partition, pause_tracker) ⇒ Coordinator

Returns a new instance of Coordinator.

Parameters:



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 33

def initialize(topic, partition, pause_tracker)
  @topic = topic
  @partition = partition
  @pause_tracker = pause_tracker
  @revoked = false
  @consumptions = {}
  @running_jobs = Hash.new { |h, k| h[k] = 0 }
  @manual_pause = false
  @manual_seek = false
  @mutex = Mutex.new
  @marked = false
  @failure = false
  @eofed = false
  @changed_at = monotonic_now
  @last_polled_at = @changed_at
end

Instance Attribute Details

#eofedObject

This can be set directly on the listener because it can be triggered on first run without any messages



23
24
25
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 23

def eofed
  @eofed
end

#last_polled_atObject

Last polled at time set based on the incoming last poll time



26
27
28
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 26

def last_polled_at
  @last_polled_at
end

#partitionObject (readonly)

Returns the value of attribute partition.



19
20
21
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 19

def partition
  @partition
end

#pause_trackerObject (readonly)

Returns the value of attribute pause_tracker.



19
20
21
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 19

def pause_tracker
  @pause_tracker
end

#seek_offsetObject

Returns the value of attribute seek_offset.



19
20
21
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 19

def seek_offset
  @seek_offset
end

#topicObject (readonly)

Returns the value of attribute topic.



19
20
21
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 19

def topic
  @topic
end

Instance Method Details

#consumption(consumer) ⇒ Karafka::Processing::Result

Returns result object which we can use to indicate consumption processing state.

Parameters:

  • consumer (Object)

    karafka consumer (normal or pro)

Returns:



198
199
200
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 198

def consumption(consumer)
  @consumptions[consumer] ||= Processing::Result.new
end

#decrement(job_type) ⇒ Object

Decrements number of jobs we handle at the moment

Parameters:

  • job_type (Symbol)

    type of job that we want to decrement



97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 97

def decrement(job_type)
  synchronize do
    @running_jobs[job_type] -= 1
    @changed_at = monotonic_now

    return @running_jobs[job_type] unless @running_jobs[job_type].negative?

    # This should never happen. If it does, something is heavily out of sync. Please reach
    # out to us if you encounter this
    raise Karafka::Errors::InvalidCoordinatorStateError, "Was zero before decrementation"
  end
end

#eofed?Boolean

Returns did we reach end of partition when polling data.

Returns:

  • (Boolean)

    did we reach end of partition when polling data



162
163
164
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 162

def eofed?
  @eofed
end

#failure!(consumer, error) ⇒ Object

Mark given consumption on consumer as failed

Parameters:

  • consumer (Karafka::BaseConsumer)

    consumer that failed

  • error (StandardError)

    error that occurred



131
132
133
134
135
136
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 131

def failure!(consumer, error)
  synchronize do
    @failure = true
    consumption(consumer).failure!(error)
  end
end

#failure?Boolean

Returns true if any of work we were running failed.

Returns:

  • (Boolean)

    true if any of work we were running failed



139
140
141
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 139

def failure?
  @failure
end

#increment(job_type) ⇒ Object

Increases number of jobs that we handle with this coordinator

Parameters:

  • job_type (Symbol)

    type of job that we want to increment



88
89
90
91
92
93
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 88

def increment(job_type)
  synchronize do
    @running_jobs[job_type] += 1
    @changed_at = monotonic_now
  end
end

#manual_pauseObject

Store in the coordinator info, that this pause was done manually by the end user and not by the system itself



176
177
178
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 176

def manual_pause
  @manual_pause = true
end

#manual_pause?Boolean

Returns are we in a pause that was initiated by the user.

Returns:

  • (Boolean)

    are we in a pause that was initiated by the user



181
182
183
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 181

def manual_pause?
  paused? && @manual_pause
end

#manual_seekObject

Marks seek as manual for coordination purposes



186
187
188
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 186

def manual_seek
  @manual_seek = true
end

#manual_seek?Boolean

Returns did a user invoke seek in the current operations scope.

Returns:

  • (Boolean)

    did a user invoke seek in the current operations scope



191
192
193
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 191

def manual_seek?
  @manual_seek
end

#marked?Boolean

Returns was the new seek offset assigned at least once. This is needed because by default we assign seek offset of a first message ever, however this is insufficient for DLQ in a scenario where the first message would be broken. We would never move out of it and would end up in an endless loop.

Returns:

  • (Boolean)

    was the new seek offset assigned at least once. This is needed because by default we assign seek offset of a first message ever, however this is insufficient for DLQ in a scenario where the first message would be broken. We would never move out of it and would end up in an endless loop.



170
171
172
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 170

def marked?
  @marked
end

#revokeObject

Marks given coordinator for processing group as revoked

This is invoked in two places:

- from the main listener loop when we detect revoked partitions
- from the consumer in case checkpointing fails

This means, we can end up having consumer being aware that it was revoked prior to the listener loop dispatching the revocation job. It is ok, as effectively nothing will be processed until revocation jobs are done.



152
153
154
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 152

def revoke
  synchronize { @revoked = true }
end

#revoked?Boolean

Returns is the partition we are processing revoked or not.

Returns:

  • (Boolean)

    is the partition we are processing revoked or not



157
158
159
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 157

def revoked?
  @revoked
end

#start(messages) ⇒ Object

Starts the coordinator for given consumption jobs

Parameters:

  • messages (Array<Karafka::Messages::Message>)

    batch of message for which we are going to coordinate work. Not used with regular coordinator.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 53

def start(messages)
  @failure = false
  @running_jobs[:consume] = 0
  # We need to clear the consumption results hash here, otherwise we could end up storing
  # consumption results of consumer instances we no longer control
  @consumptions.clear

  # When starting to run, no pause is expected and no manual pause as well
  @manual_pause = false

  # No user invoked seeks on a new run
  @manual_seek = false

  # We set it on the first encounter and never again, because then the offset setting
  # should be up to the consumers logic (our or the end user)
  # Seek offset needs to be always initialized as for case where manual offset management
  # is turned on, we need to have reference to the first offset even in case of running
  # multiple batches without marking any messages as consumed. Rollback needs to happen to
  # the last place we know of or the last message + 1 that was marked
  #
  # It is however worth keeping in mind, that this may need to be used with `#marked?` to
  # make sure that the first offset is an offset that has been marked.
  @seek_offset ||= messages.first.offset
end

#success!(consumer) ⇒ Object

Mark given consumption on consumer as successful

Parameters:



122
123
124
125
126
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 122

def success!(consumer)
  synchronize do
    consumption(consumer).success!
  end
end

#success?Boolean

Note:

This is only used for consume synchronization

Is all the consumption done and finished successfully for this coordinator We do not say we’re successful until all work is done, because running work may still crash.

Returns:

  • (Boolean)


114
115
116
117
118
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 114

def success?
  synchronize do
    @running_jobs[:consume].zero? && @consumptions.values.all?(&:success?)
  end
end

#synchronizeObject

Note:

We check if mutex is not owned already by the current thread so we won’t end up with a deadlock in case user runs coordinated code from inside of his own lock

Note:

This is internal and should not be used to synchronize user-facing code. Otherwise user indirectly could cause deadlocks or prolonged locks by running his logic. This can and should however be used for multi-thread strategy applications and other internal operations locks.

Allows to run synchronized (locked) code that can operate only from a given thread



211
212
213
214
215
216
217
# File 'lib/karafka/processing/consumer_groups/coordinator.rb', line 211

def synchronize(&)
  if @mutex.owned?
    yield
  else
    @mutex.synchronize(&)
  end
end