Class: Karafka::BaseConsumer
- Inherits:
-
Object
- Object
- Karafka::BaseConsumer
- Extended by:
- Forwardable
- Includes:
- Core::Taggable
- Defined in:
- lib/karafka/base_consumer.rb
Overview
Base consumer from which all Karafka consumers should inherit
Direct Known Subclasses
Instance Attribute Summary collapse
-
#client ⇒ Karafka::Connection::Client
Kafka connection client.
-
#coordinator ⇒ Karafka::Processing::Coordinator
Coordinator.
-
#id ⇒ String
readonly
Id of the current consumer.
-
#messages ⇒ Karafka::Routing::Topic
Topic to which a given consumer is subscribed.
-
#producer ⇒ Waterdrop::Producer
Producer instance.
Instance Method Summary collapse
-
#initialize ⇒ BaseConsumer
constructor
Creates new consumer and assigns it an id.
- #on_after_consume ⇒ Object
-
#on_before_consume ⇒ Object
Can be used to run preparation code in the worker.
-
#on_before_schedule_consume ⇒ Object
Can be used to run preparation code prior to the job being enqueued.
-
#on_before_schedule_idle ⇒ Object
Can be used to run code prior to scheduling of idle execution.
-
#on_before_schedule_revoked ⇒ Object
Can be used to run code prior to scheduling of revoked execution.
-
#on_before_schedule_shutdown ⇒ Object
Can be used to run code prior to scheduling of revoked execution.
-
#on_consume ⇒ Boolean
Executes the default consumer flow.
-
#on_idle ⇒ Object
Trigger method for running on idle runs without messages.
-
#on_revoked ⇒ Object
Trigger method for running on partition revocation.
-
#on_shutdown ⇒ Object
Trigger method for running on shutdown.
Constructor Details
#initialize ⇒ BaseConsumer
Creates new consumer and assigns it an id
29 30 31 32 |
# File 'lib/karafka/base_consumer.rb', line 29 def initialize @id = SecureRandom.hex(6) @used = false end |
Instance Attribute Details
#client ⇒ Karafka::Connection::Client
Returns kafka connection client.
22 23 24 |
# File 'lib/karafka/base_consumer.rb', line 22 def client @client end |
#coordinator ⇒ Karafka::Processing::Coordinator
Returns coordinator.
24 25 26 |
# File 'lib/karafka/base_consumer.rb', line 24 def coordinator @coordinator end |
#id ⇒ String (readonly)
Returns id of the current consumer.
18 19 20 |
# File 'lib/karafka/base_consumer.rb', line 18 def id @id end |
#messages ⇒ Karafka::Routing::Topic
Returns topic to which a given consumer is subscribed.
20 21 22 |
# File 'lib/karafka/base_consumer.rb', line 20 def @messages end |
#producer ⇒ Waterdrop::Producer
Returns producer instance.
26 27 28 |
# File 'lib/karafka/base_consumer.rb', line 26 def producer @producer end |
Instance Method Details
#on_after_consume ⇒ Object
This should not be used by the end users as it is part of the lifecycle of things but not as part of the public api.
We handle and report errors here because of flows that could fail. For example a DLQ flow could fail if it was not able to dispatch the DLQ message. Other “non-user” based flows do not interact with external systems and their errors are expected to bubble up
89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/karafka/base_consumer.rb', line 89 def on_after_consume handle_after_consume rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, seek_offset: coordinator.seek_offset, type: 'consumer.after_consume.error' ) retry_after_pause end |
#on_before_consume ⇒ Object
This should not be used by the end users as it is part of the lifecycle of things and not as part of the public api. This can act as a hook when creating non-blocking consumers and doing other advanced stuff
Can be used to run preparation code in the worker
51 52 53 54 55 56 57 58 |
# File 'lib/karafka/base_consumer.rb', line 51 def on_before_consume ..processed_at = Time.now ..freeze # We run this after the full metadata setup, so we can use all the messages information # if needed handle_before_consume end |
#on_before_schedule_consume ⇒ Object
This should not be used by the end users as it is part of the lifecycle of things and not as a part of the public api. This should not perform any extensive operations as it is blocking and running in the listener thread.
Can be used to run preparation code prior to the job being enqueued
40 41 42 43 |
# File 'lib/karafka/base_consumer.rb', line 40 def on_before_schedule_consume @used = true handle_before_schedule_consume end |
#on_before_schedule_idle ⇒ Object
Can be used to run code prior to scheduling of idle execution
106 107 108 |
# File 'lib/karafka/base_consumer.rb', line 106 def on_before_schedule_idle handle_before_schedule_idle end |
#on_before_schedule_revoked ⇒ Object
Can be used to run code prior to scheduling of revoked execution
120 121 122 |
# File 'lib/karafka/base_consumer.rb', line 120 def on_before_schedule_revoked handle_before_schedule_revoked end |
#on_before_schedule_shutdown ⇒ Object
Can be used to run code prior to scheduling of revoked execution
141 142 143 |
# File 'lib/karafka/base_consumer.rb', line 141 def on_before_schedule_shutdown handle_before_schedule_shutdown end |
#on_consume ⇒ Boolean
We keep the seek offset tracking, and use it to compensate for async offset flushing that may not yet kick in when error occurs. That way we pause always on the last processed message.
Executes the default consumer flow.
69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/karafka/base_consumer.rb', line 69 def on_consume handle_consume rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, seek_offset: coordinator.seek_offset, type: 'consumer.consume.error' ) end |
#on_idle ⇒ Object
Trigger method for running on idle runs without messages
113 114 115 |
# File 'lib/karafka/base_consumer.rb', line 113 def on_idle handle_idle end |
#on_revoked ⇒ Object
Trigger method for running on partition revocation.
127 128 129 130 131 132 133 134 135 136 |
# File 'lib/karafka/base_consumer.rb', line 127 def on_revoked handle_revoked rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.revoked.error' ) end |
#on_shutdown ⇒ Object
Trigger method for running on shutdown.
148 149 150 151 152 153 154 155 156 157 |
# File 'lib/karafka/base_consumer.rb', line 148 def on_shutdown handle_shutdown rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.shutdown.error' ) end |