Class: Karafka::Persistence::Consumers
- Inherits:
-
Object
- Object
- Karafka::Persistence::Consumers
- Defined in:
- lib/karafka/persistence/consumers.rb
Overview
Module used to provide a persistent cache across batch requests for a given topic and partition to store some additional details when the persistent mode for a given topic is turned on
Class Method Summary collapse
-
.clear ⇒ Object
Removes all persisted instances of consumers from the consumer cache.
-
.current ⇒ Hash
Current thread's persistence scope hash with all the consumers.
-
.fetch(topic, partition) ⇒ Karafka::BaseConsumer
Used to build (if block given) and/or fetch a current consumer instance that will be used to process messages from a given topic and partition.
Class Method Details
.clear ⇒ Object
This is used to reload consumers instances when code reloading in development mode is present. This should not be used in production.
Removes all persisted instances of consumers from the consumer cache
36 37 38 39 40 41 |
# File 'lib/karafka/persistence/consumers.rb', line 36 def clear Thread .list .select { |thread| thread[PERSISTENCE_SCOPE] } .each { |thread| thread[PERSISTENCE_SCOPE].clear } end |
.current ⇒ Hash
Returns current thread's persistence scope hash with all the consumers.
18 19 20 21 22 |
# File 'lib/karafka/persistence/consumers.rb', line 18 def current Thread.current[PERSISTENCE_SCOPE] ||= Concurrent::Hash.new do |hash, key| hash[key] = Concurrent::Hash.new end end |
.fetch(topic, partition) ⇒ Karafka::BaseConsumer
Used to build (if block given) and/or fetch a current consumer instance that will be
used to process from a given topic and partition
29 30 31 |
# File 'lib/karafka/persistence/consumers.rb', line 29 def fetch(topic, partition) current[topic][partition] ||= topic.consumer.new(topic) end |