Class: Karafka::Processing::ConsumerGroups::Executor
- Inherits:
-
Object
- Object
- Karafka::Processing::ConsumerGroups::Executor
- Extended by:
- Forwardable
- Defined in:
- lib/karafka/processing/consumer_groups/executor.rb
Overview
Executors are not removed after partition is revoked. They are not that big and will be re-used in case of a re-claim
Since given consumer can run various operations, executor manages that and its lifecycle. There are following types of operations with appropriate before/after, etc:
-
consume - primary operation related to running user consumption code
-
idle - cleanup job that runs on idle runs where no messages would be passed to the end user. This is used for complex flows with filters, etc
-
revoked - runs after the partition was revoked
-
shutdown - runs when process is going to shutdown
Executors:
-
run consumers code (for ‘#call`) or run given preparation / teardown operations when needed from separate threads.
-
they re-create consumer instances in case of partitions that were revoked and assigned back.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#coordinator ⇒ Karafka::Processing::ConsumerGroups::Coordinator
readonly
Coordinator for this executor.
-
#group_id ⇒ String
readonly
Subscription group id to which a given executor belongs.
-
#id ⇒ String
readonly
Unique id that we use to ensure, that we use for state tracking.
-
#messages ⇒ Karafka::Messages::Messages
readonly
Messages batch.
Instance Method Summary collapse
-
#after_consume ⇒ Object
Runs consumer after consumption code.
-
#before_consume ⇒ Object
Runs setup and warm-up code in the worker prior to running the consumption.
-
#before_schedule_consume(messages) ⇒ Object
Allows us to prepare the consumer in the listener thread prior to the job being send to be scheduled.
-
#before_schedule_eofed ⇒ Object
Runs the code needed before eofed work is scheduled.
-
#before_schedule_idle ⇒ Object
Runs the code needed before idle work is scheduled.
-
#before_schedule_revoked ⇒ Object
Runs code needed before revoked job is scheduled.
-
#before_schedule_shutdown ⇒ Object
Runs code needed before shutdown job is scheduled.
-
#consume ⇒ Object
Runs consumer data processing against given batch and handles failures and errors.
-
#eofed ⇒ Object
Runs consumed eofed operation.
-
#idle ⇒ Object
Runs consumer idle operations This may include house-keeping or other state management changes that can occur but that not mean there are any new messages available for the end user to process.
-
#initialize(group_id, client, coordinator) ⇒ Executor
constructor
A new instance of Executor.
-
#revoked ⇒ Object
Runs the controller ‘#revoked` method that should be triggered when a given consumer is no longer needed due to partitions reassignment.
-
#shutdown ⇒ Object
Runs the controller ‘#shutdown` method that should be triggered when a given consumer is no longer needed as we’re closing the process.
-
#wrap(action) ⇒ Object
Runs the wrap/around execution context appropriate for a given action.
Constructor Details
#initialize(group_id, client, coordinator) ⇒ Executor
Returns a new instance of Executor.
50 51 52 53 54 55 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 50 def initialize(group_id, client, coordinator) @id = SecureRandom.hex(6) @group_id = group_id @client = client @coordinator = coordinator end |
Instance Attribute Details
#coordinator ⇒ Karafka::Processing::ConsumerGroups::Coordinator (readonly)
Returns coordinator for this executor.
45 46 47 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 45 def coordinator @coordinator end |
#group_id ⇒ String (readonly)
Returns subscription group id to which a given executor belongs.
39 40 41 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 39 def group_id @group_id end |
#id ⇒ String (readonly)
Returns unique id that we use to ensure, that we use for state tracking.
36 37 38 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 36 def id @id end |
#messages ⇒ Karafka::Messages::Messages (readonly)
Returns messages batch.
42 43 44 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 42 def @messages end |
Instance Method Details
#after_consume ⇒ Object
Runs consumer after consumption code
100 101 102 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 100 def after_consume consumer.on_after_consume end |
#before_consume ⇒ Object
Runs setup and warm-up code in the worker prior to running the consumption
83 84 85 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 83 def before_consume consumer.on_before_consume end |
#before_schedule_consume(messages) ⇒ Object
Allows us to prepare the consumer in the listener thread prior to the job being send to be scheduled. It also allows to run some code that is time sensitive and cannot wait in the queue as it could cause starvation.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 62 def before_schedule_consume() # Recreate consumer with each batch if persistence is not enabled # We reload the consumers with each batch instead of relying on some external signals # when needed for consistency. That way devs may have it on or off and not in this # middle state, where re-creation of a consumer instance would occur only sometimes @consumer = nil unless topic.consumer_persistence # First we build messages batch... consumer. = Messages::Builders::Messages.call( , topic, partition, # the moment we've received the batch or actually the moment we've enqueued it, # but good enough Time.now ) consumer.on_before_schedule_consume end |
#before_schedule_eofed ⇒ Object
Runs the code needed before eofed work is scheduled
117 118 119 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 117 def before_schedule_eofed consumer.on_before_schedule_eofed end |
#before_schedule_idle ⇒ Object
Runs the code needed before idle work is scheduled
105 106 107 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 105 def before_schedule_idle consumer.on_before_schedule_idle end |
#before_schedule_revoked ⇒ Object
Runs code needed before revoked job is scheduled
129 130 131 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 129 def before_schedule_revoked consumer.on_before_schedule_revoked if @consumer end |
#before_schedule_shutdown ⇒ Object
Runs code needed before shutdown job is scheduled
150 151 152 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 150 def before_schedule_shutdown consumer.on_before_schedule_shutdown if @consumer end |
#consume ⇒ Object
Runs consumer data processing against given batch and handles failures and errors.
94 95 96 97 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 94 def consume # We run the consumer client logic... consumer.on_consume end |
#eofed ⇒ Object
Runs consumed eofed operation. This may run even when there were no messages received prior. This will however not run when eof is received together with messages as in such case ‘#consume` will run
124 125 126 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 124 def eofed consumer.on_eofed end |
#idle ⇒ Object
Runs consumer idle operations This may include house-keeping or other state management changes that can occur but that not mean there are any new messages available for the end user to process
112 113 114 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 112 def idle consumer.on_idle end |
#revoked ⇒ Object
Clearing the consumer will ensure, that if we get the partition back, it will be handled with a consumer with a clean state.
We run it only when consumer was present, because presence indicates, that at least a single message has been consumed.
We do not reset the consumer but we indicate need for recreation instead, because after the revocation, there still may be ‘#after_consume` running that needs a given consumer instance.
Runs the controller ‘#revoked` method that should be triggered when a given consumer is no longer needed due to partitions reassignment.
145 146 147 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 145 def revoked consumer.on_revoked if @consumer end |
#shutdown ⇒ Object
While we do not need to clear the consumer here, it’s a good habit to clean after work is done.
Runs the controller ‘#shutdown` method that should be triggered when a given consumer is no longer needed as we’re closing the process.
159 160 161 162 163 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 159 def shutdown # There is a case, where the consumer no longer exists because it was revoked, in case like # that we do not build a new instance and shutdown should not be triggered. consumer.on_shutdown if @consumer end |
#wrap(action) ⇒ Object
Runs the wrap/around execution context appropriate for a given action
89 90 91 |
# File 'lib/karafka/processing/consumer_groups/executor.rb', line 89 def wrap(action, &) consumer.on_wrap(action, &) end |