Class: Karafka::Processing::Coordinator

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/processing/coordinator.rb

Overview

Note:

This coordinator needs to be thread safe. Some operations are performed only in the listener thread, but we go with thread-safe by default for all not to worry about potential future mistakes.

Basic coordinator that allows us to provide coordination objects into consumers.

This is a wrapping layer to simplify management of work to be handled around consumption.

Direct Known Subclasses

Karafka::Pro::Processing::Coordinator

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, partition, pause_tracker) ⇒ Coordinator

Returns a new instance of Coordinator.

Parameters:



23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/karafka/processing/coordinator.rb', line 23

def initialize(topic, partition, pause_tracker)
  @topic = topic
  @partition = partition
  @pause_tracker = pause_tracker
  @revoked = false
  @consumptions = {}
  @running_jobs = Hash.new { |h, k| h[k] = 0 }
  @manual_pause = false
  @manual_seek = false
  @mutex = Mutex.new
  @marked = false
  @failure = false
  @changed_at = monotonic_now
end

Instance Attribute Details

#partitionObject (readonly)

Returns the value of attribute partition.



16
17
18
# File 'lib/karafka/processing/coordinator.rb', line 16

def partition
  @partition
end

#pause_trackerObject (readonly)

Returns the value of attribute pause_tracker.



16
17
18
# File 'lib/karafka/processing/coordinator.rb', line 16

def pause_tracker
  @pause_tracker
end

#seek_offsetObject

Returns the value of attribute seek_offset.



16
17
18
# File 'lib/karafka/processing/coordinator.rb', line 16

def seek_offset
  @seek_offset
end

#topicObject (readonly)

Returns the value of attribute topic.



16
17
18
# File 'lib/karafka/processing/coordinator.rb', line 16

def topic
  @topic
end

Instance Method Details

#consumption(consumer) ⇒ Karafka::Processing::Result

Returns result object which we can use to indicate consumption processing state.

Parameters:

  • consumer (Object)

    karafka consumer (normal or pro)

Returns:



181
182
183
# File 'lib/karafka/processing/coordinator.rb', line 181

def consumption(consumer)
  @consumptions[consumer] ||= Processing::Result.new
end

#decrement(job_type) ⇒ Object

Decrements number of jobs we handle at the moment

Parameters:

  • job_type (Symbol)

    type of job that we want to decrement



85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/karafka/processing/coordinator.rb', line 85

def decrement(job_type)
  synchronize do
    @running_jobs[job_type] -= 1
    @changed_at = monotonic_now

    return @running_jobs[job_type] unless @running_jobs[job_type].negative?

    # This should never happen. If it does, something is heavily out of sync. Please reach
    # out to us if you encounter this
    raise Karafka::Errors::InvalidCoordinatorStateError, 'Was zero before decrementation'
  end
end

#failure!(consumer, error) ⇒ Object

Mark given consumption on consumer as failed

Parameters:

  • consumer (Karafka::BaseConsumer)

    consumer that failed

  • error (StandardError)

    error that occurred



119
120
121
122
123
124
# File 'lib/karafka/processing/coordinator.rb', line 119

def failure!(consumer, error)
  synchronize do
    @failure = true
    consumption(consumer).failure!(error)
  end
end

#failure?Boolean

Returns true if any of work we were running failed.

Returns:

  • (Boolean)

    true if any of work we were running failed



127
128
129
# File 'lib/karafka/processing/coordinator.rb', line 127

def failure?
  @failure
end

#increment(job_type) ⇒ Object

Increases number of jobs that we handle with this coordinator

Parameters:

  • job_type (Symbol)

    type of job that we want to increment



76
77
78
79
80
81
# File 'lib/karafka/processing/coordinator.rb', line 76

def increment(job_type)
  synchronize do
    @running_jobs[job_type] += 1
    @changed_at = monotonic_now
  end
end

#manual_pauseObject

Store in the coordinator info, that this pause was done manually by the end user and not by the system itself



159
160
161
# File 'lib/karafka/processing/coordinator.rb', line 159

def manual_pause
  @manual_pause = true
end

#manual_pause?Boolean

Returns are we in a pause that was initiated by the user.

Returns:

  • (Boolean)

    are we in a pause that was initiated by the user



164
165
166
# File 'lib/karafka/processing/coordinator.rb', line 164

def manual_pause?
  paused? && @manual_pause
end

#manual_seekObject

Marks seek as manual for coordination purposes



169
170
171
# File 'lib/karafka/processing/coordinator.rb', line 169

def manual_seek
  @manual_seek = true
end

#manual_seek?Boolean

Returns did a user invoke seek in the current operations scope.

Returns:

  • (Boolean)

    did a user invoke seek in the current operations scope



174
175
176
# File 'lib/karafka/processing/coordinator.rb', line 174

def manual_seek?
  @manual_seek
end

#marked?Boolean

Returns was the new seek offset assigned at least once. This is needed because by default we assign seek offset of a first message ever, however this is insufficient for DLQ in a scenario where the first message would be broken. We would never move out of it and would end up in an endless loop.

Returns:

  • (Boolean)

    was the new seek offset assigned at least once. This is needed because by default we assign seek offset of a first message ever, however this is insufficient for DLQ in a scenario where the first message would be broken. We would never move out of it and would end up in an endless loop.



153
154
155
# File 'lib/karafka/processing/coordinator.rb', line 153

def marked?
  @marked
end

#revokeObject

Marks given coordinator for processing group as revoked

This is invoked in two places:

- from the main listener loop when we detect revoked partitions
- from the consumer in case checkpointing fails

This means, we can end up having consumer being aware that it was revoked prior to the listener loop dispatching the revocation job. It is ok, as effectively nothing will be processed until revocation jobs are done.



140
141
142
# File 'lib/karafka/processing/coordinator.rb', line 140

def revoke
  synchronize { @revoked = true }
end

#revoked?Boolean

Returns is the partition we are processing revoked or not.

Returns:

  • (Boolean)

    is the partition we are processing revoked or not



145
146
147
# File 'lib/karafka/processing/coordinator.rb', line 145

def revoked?
  @revoked
end

#start(messages) ⇒ Object

Starts the coordinator for given consumption jobs

Parameters:

  • messages (Array<Karafka::Messages::Message>)

    batch of message for which we are going to coordinate work. Not used with regular coordinator.



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/karafka/processing/coordinator.rb', line 41

def start(messages)
  @failure = false
  @running_jobs[:consume] = 0
  # We need to clear the consumption results hash here, otherwise we could end up storing
  # consumption results of consumer instances we no longer control
  @consumptions.clear

  # When starting to run, no pause is expected and no manual pause as well
  @manual_pause = false

  # No user invoked seeks on a new run
  @manual_seek = false

  # We set it on the first encounter and never again, because then the offset setting
  # should be up to the consumers logic (our or the end user)
  # Seek offset needs to be always initialized as for case where manual offset management
  # is turned on, we need to have reference to the first offset even in case of running
  # multiple batches without marking any messages as consumed. Rollback needs to happen to
  # the last place we know of or the last message + 1 that was marked
  #
  # It is however worth keeping in mind, that this may need to be used with `#marked?` to
  # make sure that the first offset is an offset that has been marked.
  @seek_offset ||= messages.first.offset
end

#success!(consumer) ⇒ Object

Mark given consumption on consumer as successful

Parameters:



110
111
112
113
114
# File 'lib/karafka/processing/coordinator.rb', line 110

def success!(consumer)
  synchronize do
    consumption(consumer).success!
  end
end

#success?Boolean

Note:

This is only used for consume synchronization

Is all the consumption done and finished successfully for this coordinator We do not say we’re successful until all work is done, because running work may still crash.

Returns:

  • (Boolean)


102
103
104
105
106
# File 'lib/karafka/processing/coordinator.rb', line 102

def success?
  synchronize do
    @running_jobs[:consume].zero? && @consumptions.values.all?(&:success?)
  end
end

#synchronize(&block) ⇒ Object

Note:

We check if mutex is not owned already by the current thread so we won’t end up with a deadlock in case user runs coordinated code from inside of his own lock

Note:

This is internal and should not be used to synchronize user-facing code. Otherwise user indirectly could cause deadlocks or prolonged locks by running his logic. This can and should however be used for multi-thread strategy applications and other internal operations locks.

Allows to run synchronized (locked) code that can operate only from a given thread

Parameters:

  • block (Proc)

    code we want to run in the synchronized mode



196
197
198
199
200
201
202
# File 'lib/karafka/processing/coordinator.rb', line 196

def synchronize(&block)
  if @mutex.owned?
    yield
  else
    @mutex.synchronize(&block)
  end
end