Class: Karafka::Connection::Client
- Inherits:
-
Object
- Object
- Karafka::Connection::Client
- Extended by:
- Forwardable
- Defined in:
- lib/karafka/connection/client.rb
Overview
Class used as a wrapper around Ruby-Kafka client to simplify additional features that we provide/might provide in future and to hide the internal implementation
Instance Method Summary collapse
-
#fetch_loop {|kafka| ... } ⇒ Object
Opens connection, gets messages and calls a block for each of the incoming messages.
-
#initialize(consumer_group) ⇒ Karafka::Connection::Client
constructor
Creates a queue consumer client that will pull the data from Kafka.
-
#mark_as_consumed(params) ⇒ Object
Marks given message as consumed.
-
#mark_as_consumed!(params) ⇒ Object
Marks a given message as consumed and commit the offsets in a blocking way.
-
#pause(topic, partition) ⇒ Object
Pauses fetching and consumption of a given topic partition.
-
#stop ⇒ Object
Gracefully stops topic consumption.
Constructor Details
#initialize(consumer_group) ⇒ Karafka::Connection::Client
Creates a queue consumer client that will pull the data from Kafka
23 24 25 26 |
# File 'lib/karafka/connection/client.rb', line 23 def initialize(consumer_group) @consumer_group = consumer_group Persistence::Client.write(self) end |
Instance Method Details
#fetch_loop {|kafka| ... } ⇒ Object
This will yield with raw messages - no preprocessing or reformatting.
Opens connection, gets messages and calls a block for each of the incoming messages
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/karafka/connection/client.rb', line 32 def fetch_loop settings = ApiAdapter.consumption(consumer_group) if consumer_group.batch_fetching kafka_consumer.each_batch(**settings) { |batch| yield(batch, :batch) } else kafka_consumer.(**settings) { || yield(, :message) } end # @note We catch only the processing errors as any other are considered critical (exceptions) # and should require a client restart with a backoff rescue Kafka::ProcessingError => e # If there was an error during consumption, we have to log it, pause current partition # and process other things Karafka.monitor.instrument( 'connection.client.fetch_loop.error', caller: self, error: e.cause ) pause(e.topic, e.partition) retry end |
#mark_as_consumed(params) ⇒ Object
This method won't trigger automatic offsets commits, rather relying on the ruby-kafka offsets time-interval based committing
Marks given message as consumed
75 76 77 78 79 |
# File 'lib/karafka/connection/client.rb', line 75 def mark_as_consumed(params) kafka_consumer.( *ApiAdapter.(params) ) end |
#mark_as_consumed!(params) ⇒ Object
This method commits the offset for each manual marking to be sure that offset commit happen asap in case of a crash
Marks a given message as consumed and commit the offsets in a blocking way
85 86 87 88 89 90 |
# File 'lib/karafka/connection/client.rb', line 85 def mark_as_consumed!(params) mark_as_consumed(params) # Trigger an immediate, blocking offset commit in order to minimize the risk of crashing # before the automatic triggers have kicked in. kafka_consumer.commit_offsets end |
#pause(topic, partition) ⇒ Object
Pauses fetching and consumption of a given topic partition
66 67 68 69 |
# File 'lib/karafka/connection/client.rb', line 66 def pause(topic, partition) args, kwargs = ApiAdapter.pause(topic, partition, consumer_group).values_at(:args, :kwargs) kafka_consumer.pause(*args, **kwargs) end |
#stop ⇒ Object
Stopping running consumers without a really important reason is not recommended as until all the consumers are stopped, the server will keep running serving only part of the messages
Gracefully stops topic consumption
58 59 60 61 |
# File 'lib/karafka/connection/client.rb', line 58 def stop @kafka_consumer&.stop @kafka_consumer = nil end |