Class: Karafka::App
- Inherits:
-
Object
- Object
- Karafka::App
- Extended by:
- Setup::Dsl
- Defined in:
- lib/karafka/app.rb
Overview
App class
Class Method Summary collapse
-
.assignments ⇒ Hash{Karafka::Routing::Topic => Array<Integer>}
Returns current assignments of this process.
-
.consumer_groups ⇒ Karafka::Routing::Builder
(also: routes, groups)
Consumers builder instance alias.
-
.debug!(contexts = "all") ⇒ Object
Forces the debug setup onto Karafka and default WaterDrop producer.
-
.declaratives ⇒ Karafka::Declaratives::Builder
Declaratives builder instance.
-
.done? ⇒ Boolean
True if we should be done in general with processing anything.
-
.subscription_groups ⇒ Hash
Active subscription groups grouped based on consumer group in a hash.
-
.warmup ⇒ Object
Notifies the Ruby virtual machine that the boot sequence is finished, and that now is a good time to optimize the application.
Methods included from Setup::Dsl
Class Method Details
.assignments ⇒ Hash{Karafka::Routing::Topic => Array<Integer>}
Returns current assignments of this process. Both topics and partitions
67 68 69 |
# File 'lib/karafka/app.rb', line 67 def assignments Instrumentation::AssignmentsTracker.instance.current end |
.consumer_groups ⇒ Karafka::Routing::Builder Also known as: routes, groups
Returns consumers builder instance alias.
24 25 26 27 28 29 |
# File 'lib/karafka/app.rb', line 24 def consumer_groups config .internal .routing .builder end |
.debug!(contexts = "all") ⇒ Object
Forces the debug setup onto Karafka and default WaterDrop producer. This needs to run prior to any operations that would cache state, like consuming or producing messages.
115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/karafka/app.rb', line 115 def debug!(contexts = "all") logger.level = Logger::DEBUG producer.config.logger.level = Logger::DEBUG config.kafka[:debug] = contexts producer.config.kafka[:debug] = contexts routes.map(&:topics).flat_map(&:to_a).each do |topic| topic.kafka[:debug] = contexts end end |
.declaratives ⇒ Karafka::Declaratives::Builder
Returns declaratives builder instance.
51 52 53 54 55 56 |
# File 'lib/karafka/app.rb', line 51 def declaratives config .internal .declaratives .builder end |
.done? ⇒ Boolean
It is a meta status from the status object
Returns true if we should be done in general with processing anything.
90 91 92 |
# File 'lib/karafka/app.rb', line 90 def done? App.config.internal.status.done? end |
.subscription_groups ⇒ Hash
Returns active subscription groups grouped based on consumer group in a hash.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/karafka/app.rb', line 32 def subscription_groups # We first build all the subscription groups, so they all get the same position, despite # later narrowing that. It allows us to maintain same position number for static members # even when we want to run subset of consumer groups or subscription groups # # We then narrow this to active consumer groups from which we select active subscription # groups. consumer_groups .map { |group| [group, group.subscription_groups] } .select { |group, _| group.active? } .select { |_, sgs| sgs.delete_if { |sg| !sg.active? } } .delete_if { |_, sgs| sgs.empty? } .each { |_, sgs| sgs.each { |sg| sg.topics.delete_if { |topic| !topic.active? } } } .each { |_, sgs| sgs.delete_if { |sg| sg.topics.empty? } } .reject { |group, _| group.subscription_groups.empty? } .to_h end |
.warmup ⇒ Object
Notifies the Ruby virtual machine that the boot sequence is finished, and that now is a good time to optimize the application. In case of older Ruby versions, runs compacting, which is part of the full warmup introduced in Ruby 3.3.
12 13 14 15 16 17 18 19 20 21 |
# File 'lib/karafka/app.rb', line 12 def warmup # Per recommendation, this should not run in children nodes return if Karafka::App.config.swarm.node monitor.instrument("app.before_warmup", caller: self) return GC.compact unless ::Process.respond_to?(:warmup) ::Process.warmup end |