Class: Karafka::Pro::Connection::Manager

Inherits:
Connection::Manager show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/pro/connection/manager.rb

Overview

Note:

Manager operations relate to consumer groups and not subscription groups. Since cluster operations can cause consumer group wide effects, we always apply only one change on a consumer group.

Manager that can handle working with multiplexed connections.

This manager takes into consideration the number of partitions assigned to the topics and does its best to balance. Additional connections may not always be utilized because alongside of them, other processes may “hijack” the assignment. In such cases those extra empty connections will be turned off after a while.

Instance Method Summary collapse

Methods inherited from Connection::Manager

#done?

Constructor Details

#initializeManager

Creates new manager instance



48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/karafka/pro/connection/manager.rb', line 48

def initialize
  super
  @mutex = Mutex.new
  @changes = Hash.new do |h, k|
    h[k] = {
      state: "",
      join_state: "",
      state_age: 0,
      changed_at: monotonic_now
    }
  end
end

Instance Method Details

#controlObject

Shuts down all the listeners when it is time (including moving to quiet) or rescales when it is needed



114
115
116
# File 'lib/karafka/pro/connection/manager.rb', line 114

def control
  Karafka::App.done? ? shutdown : rescale
end

#notice(subscription_group_id, statistics) ⇒ Object

Note:

Please note that while we collect here per subscription group, we use those metrics collectively on a whole consumer group. This reduces the friction.

Collects data from the statistics about given subscription group. This is used to ensure that we do not rescale short after rebalances, deployments, etc.

Parameters:

  • subscription_group_id (String)

    id of the subscription group for which statistics were emitted

  • statistics (Hash)

    emitted statistics



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/karafka/pro/connection/manager.rb', line 93

def notice(subscription_group_id, statistics)
  times = []
  # stateage is in microseconds
  # We monitor broker changes to make sure we do not introduce extra friction
  times << (statistics["brokers"].values.map { |stats| stats["stateage"] }.min / 1_000)
  times << statistics["cgrp"]["rebalance_age"]
  times << statistics["cgrp"]["stateage"]

  # Keep the previous change age for changes that were triggered by us
  previous_changed_at = @changes[subscription_group_id][:changed_at]

  @changes[subscription_group_id].merge!(
    state_age: times.min,
    changed_at: previous_changed_at,
    join_state: statistics["cgrp"]["join_state"],
    state: statistics["cgrp"]["state"]
  )
end

#register(listeners) ⇒ Object

Registers listeners and starts the scaling procedures

When using dynamic multiplexing, it will start the absolute minimum of connections for subscription group available.

Parameters:



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/karafka/pro/connection/manager.rb', line 67

def register(listeners)
  @listeners = listeners

  # Preload all the keys into the hash so we never add keys to changes but just change them
  listeners.each { |listener| @changes[listener.subscription_group.id] }

  in_sg_families do |first_subscription_group, sg_listeners|
    multiplexing = first_subscription_group.multiplexing

    if multiplexing.active? && multiplexing.dynamic?
      # Start as many boot listeners as user wants. If not configured, starts half of max.
      sg_listeners.first(multiplexing.boot).each(&:start!)
    else
      sg_listeners.each(&:start!)
    end
  end
end