Class: Karafka::Routing::Builder
- Inherits:
-
Concurrent::Array
- Object
- Concurrent::Array
- Karafka::Routing::Builder
- Defined in:
- lib/karafka/routing/builder.rb
Overview
Builder used as a DSL layer for building consumers and telling them which topics to consume
Instance Method Summary collapse
-
#active ⇒ Array<Karafka::Routing::ConsumerGroup>
Only active consumer groups that we want to use.
-
#clear ⇒ Object
Clears the builder and the draws memory.
-
#draw(&block) { ... } ⇒ Object
Used to draw routes for Karafka.
-
#initialize ⇒ Builder
constructor
A new instance of Builder.
-
#reload ⇒ Object
Redraws all the routes for the in-process code reloading.
Constructor Details
#initialize ⇒ Builder
Returns a new instance of Builder.
18 19 20 21 |
# File 'lib/karafka/routing/builder.rb', line 18 def initialize super @draws = Concurrent::Array.new end |
Instance Method Details
#active ⇒ Array<Karafka::Routing::ConsumerGroup>
Returns only active consumer groups that we want to use. Since Karafka supports multi-process setup, we need to be able to pick only those consumer groups that should be active in our given process context.
52 53 54 |
# File 'lib/karafka/routing/builder.rb', line 52 def active select(&:active?) end |
#clear ⇒ Object
Clears the builder and the draws memory
57 58 59 60 |
# File 'lib/karafka/routing/builder.rb', line 57 def clear @draws.clear super end |
#draw(&block) { ... } ⇒ Object
After it is done drawing it will store and validate all the routes to make sure that they are correct and that there are no topic/group duplications (this is forbidden)
Used to draw routes for Karafka
35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/karafka/routing/builder.rb', line 35 def draw(&block) @draws << block instance_eval(&block) each do |consumer_group| hashed_group = consumer_group.to_h validation_result = CONTRACT.call(hashed_group) next if validation_result.success? raise Errors::InvalidConfigurationError, validation_result.errors.to_h end end |
#reload ⇒ Object
This won't allow registration of new topics without process restart but will trigger cache invalidation so all the classes, etc are re-fetched after code reload
Redraws all the routes for the in-process code reloading.
65 66 67 68 69 |
# File 'lib/karafka/routing/builder.rb', line 65 def reload draws = @draws.dup clear draws.each { |block| draw(&block) } end |