Class: Karafka::App

Inherits:
Object
  • Object
show all
Extended by:
Setup::Dsl
Defined in:
lib/karafka/app.rb

Overview

App class

Class Method Summary collapse

Methods included from Setup::Dsl

config, setup

Class Method Details

.assignmentsHash{Karafka::Routing::Topic => Array<Integer>}

Returns current assignments of this process. Both topics and partitions

Returns:



67
68
69
# File 'lib/karafka/app.rb', line 67

def assignments
  Instrumentation::AssignmentsTracker.instance.current
end

.consumer_groupsKarafka::Routing::Builder Also known as: routes, groups

Returns consumers builder instance alias.

Returns:



24
25
26
27
28
29
# File 'lib/karafka/app.rb', line 24

def consumer_groups
  config
    .internal
    .routing
    .builder
end

.debug!(contexts = "all") ⇒ Object

Forces the debug setup onto Karafka and default WaterDrop producer. This needs to run prior to any operations that would cache state, like consuming or producing messages.

Parameters:

  • contexts (String) (defaults to: "all")

    librdkafka low level debug contexts for granular debugging



115
116
117
118
119
120
121
122
123
124
125
# File 'lib/karafka/app.rb', line 115

def debug!(contexts = "all")
  logger.level = Logger::DEBUG
  producer.config.logger.level = Logger::DEBUG

  config.kafka[:debug] = contexts
  producer.config.kafka[:debug] = contexts

  routes.map(&:topics).flat_map(&:to_a).each do |topic|
    topic.kafka[:debug] = contexts
  end
end

.declarativesKarafka::Declaratives::Builder

Returns declaratives builder instance.

Returns:



51
52
53
54
55
56
# File 'lib/karafka/app.rb', line 51

def declaratives
  config
    .internal
    .declaratives
    .builder
end

.done?Boolean

Note:

It is a meta status from the status object

Returns true if we should be done in general with processing anything.

Returns:

  • (Boolean)

    true if we should be done in general with processing anything



90
91
92
# File 'lib/karafka/app.rb', line 90

def done?
  App.config.internal.status.done?
end

.subscription_groupsHash

Returns active subscription groups grouped based on consumer group in a hash.

Returns:

  • (Hash)

    active subscription groups grouped based on consumer group in a hash



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/karafka/app.rb', line 32

def subscription_groups
  # We first build all the subscription groups, so they all get the same position, despite
  # later narrowing that. It allows us to maintain same position number for static members
  # even when we want to run subset of consumer groups or subscription groups
  #
  # We then narrow this to active consumer groups from which we select active subscription
  # groups.
  consumer_groups
    .map { |group| [group, group.subscription_groups] }
    .select { |group, _| group.active? }
    .select { |_, sgs| sgs.delete_if { |sg| !sg.active? } }
    .delete_if { |_, sgs| sgs.empty? }
    .each { |_, sgs| sgs.each { |sg| sg.topics.delete_if { |topic| !topic.active? } } }
    .each { |_, sgs| sgs.delete_if { |sg| sg.topics.empty? } }
    .reject { |group, _| group.subscription_groups.empty? }
    .to_h
end

.warmupObject

Notifies the Ruby virtual machine that the boot sequence is finished, and that now is a good time to optimize the application. In case of older Ruby versions, runs compacting, which is part of the full warmup introduced in Ruby 3.3.



12
13
14
15
16
17
18
19
20
21
# File 'lib/karafka/app.rb', line 12

def warmup
  # Per recommendation, this should not run in children nodes
  return if Karafka::App.config.swarm.node

  monitor.instrument("app.before_warmup", caller: self)

  return GC.compact unless ::Process.respond_to?(:warmup)

  ::Process.warmup
end