Class: Karafka::Pro::Routing::Features::Multiplexing
- Inherits:
-
Base
- Object
- Routing::Features::Base
- Base
- Karafka::Pro::Routing::Features::Multiplexing
- Defined in:
- lib/karafka/pro/routing/features/multiplexing.rb,
lib/karafka/pro/routing/features/multiplexing/proxy.rb,
lib/karafka/pro/routing/features/multiplexing/config.rb,
lib/karafka/pro/routing/features/multiplexing/contracts/topic.rb,
lib/karafka/pro/routing/features/multiplexing/contracts/routing.rb,
lib/karafka/pro/routing/features/multiplexing/subscription_group.rb,
lib/karafka/pro/routing/features/multiplexing/subscription_groups_builder.rb,
lib/karafka/pro/routing/features/multiplexing/patches/contracts/consumer_group.rb
Overview
Multiplexing allows for creating multiple subscription groups for the same topic inside of the same subscription group allowing for better parallelism with limited number of processes
Defined Under Namespace
Modules: Contracts, Patches, Proxy, SubscriptionGroup, SubscriptionGroupsBuilder Classes: Config
Class Method Summary collapse
-
.post_setup(config) ⇒ Object
If needed installs the needed listener and initializes tracker.
- .pre_setup(_config) ⇒ Object
Methods inherited from Routing::Features::Base
activate, load_all, post_setup_all, pre_setup_all
Class Method Details
.post_setup(config) ⇒ Object
If needed installs the needed listener and initializes tracker
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/karafka/pro/routing/features/multiplexing.rb', line 53 def post_setup(config) config.monitor.subscribe("app.before_warmup") do Contracts::Routing.new.validate!( config.internal.routing.builder, scope: %w[multiplexing] ) end Karafka::App.monitor.subscribe("app.running") do # Do not install the manager and listener to control multiplexing unless there is # multiplexing enabled and it is dynamic. # We only need to control multiplexing when it is in a dynamic state next unless Karafka::App .subscription_groups .values .flat_map(&:itself) .any? { |sg| sg.multiplexing? && sg.multiplexing.dynamic? } # Subscribe for events and possibility to manage via the Pro connection manager # that supports multiplexing Karafka.monitor.subscribe( Karafka::Pro::Connection::Multiplexing::Listener.new ) end end |
.pre_setup(_config) ⇒ Object
43 44 45 46 47 48 |
# File 'lib/karafka/pro/routing/features/multiplexing.rb', line 43 def pre_setup(_config) # Make sure we use proper unique validator for topics definitions Karafka::Routing::Contracts::ConsumerGroup.singleton_class.prepend( Patches::Contracts::ConsumerGroup ) end |