Module: Karafka::Pro::Routing::Features::ConsumerGroups::Swarm::Topic

Defined in:
lib/karafka/pro/routing/features/consumer_groups/swarm/topic.rb

Overview

Topic swarm API extensions

Instance Method Summary collapse

Instance Method Details

#active?Boolean

Returns should this topic be active. In the context of swarm it is only active when swarm routing setup does not limit nodes on which it should operate.

Returns:

  • (Boolean)

    should this topic be active. In the context of swarm it is only active when swarm routing setup does not limit nodes on which it should operate



85
86
87
88
89
90
91
# File 'lib/karafka/pro/routing/features/consumer_groups/swarm/topic.rb', line 85

def active?
  node = Karafka::App.config.swarm.node

  return super unless node

  super && swarm.nodes.include?(node.id)
end

#initializeObject

This method sets up the extra instance variable to nil before calling the parent class initializer. The explicit initialization to nil is included as an optimization for Ruby’s object shapes system, which improves memory layout and access performance.



43
44
45
46
# File 'lib/karafka/pro/routing/features/consumer_groups/swarm/topic.rb', line 43

def initialize(...)
  @swarm = nil
  super
end

#swarm(nodes: (0...Karafka::App.config.swarm.nodes)) ⇒ Object

Allows defining swarm routing topic settings

Examples:

Assign given topic only to node 1

swarm(nodes: [1])

Assign given topic to nodes from 1 to 3

swarm(nodes: 1..3)

Assign partitions 2 and 3 to node 0 and partitions 0, 1 to node 1

swarm(
  nodes: {
    0 => [2, 3],
    1 => [0, 1]
  }
)

Assign partitions in ranges to nodes

swarm(
  nodes: {
    0 => (0..2),
    1 => (3..5)
  }
)

Parameters:

  • nodes (Range, Array, Hash) (defaults to: (0...Karafka::App.config.swarm.nodes))

    range of nodes ids or array with nodes ids for which we should run given topic or hash with nodes expected partition assignments for the direct assignments API.



74
75
76
# File 'lib/karafka/pro/routing/features/consumer_groups/swarm/topic.rb', line 74

def swarm(nodes: (0...Karafka::App.config.swarm.nodes))
  @swarm ||= Config.new(active: true, nodes: nodes)
end

#swarm?true

Returns swarm setup is always true. May not be in use but is active.

Returns:

  • (true)

    swarm setup is always true. May not be in use but is active



79
80
81
# File 'lib/karafka/pro/routing/features/consumer_groups/swarm/topic.rb', line 79

def swarm?
  swarm.active?
end

#to_hHash

Returns topic with all its native configuration options plus swarm.

Returns:

  • (Hash)

    topic with all its native configuration options plus swarm



94
95
96
97
98
# File 'lib/karafka/pro/routing/features/consumer_groups/swarm/topic.rb', line 94

def to_h
  super.merge(
    swarm: swarm.to_h
  ).freeze
end