Class: Karafka::Web::Pro::Commanding::Handlers::Partitions::Tracker

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb

Overview

Tracker used to record incoming partition related operational requests until they are executable or invalid. It stores the requests as they come for execution pre-polling.

Instance Method Summary collapse

Constructor Details

#initializeTracker

Returns a new instance of Tracker.



47
48
49
50
51
52
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 47

def initialize
  @mutex = Mutex.new
  @requests = Hash.new { |h, k| h[k] = [] }
  # Index tracking which partitions have pending commands per consumer_group:topic
  @partition_index = Hash.new { |h, k| h[k] = Set.new }
end

Instance Method Details

#<<(command) ⇒ Object

Note:

Commands are indexed by consumer_group_id:topic:partition_id combination since partition commands are dispatched without subscription_group_id.

Adds the given command into the tracker so it can be retrieved when needed.

Parameters:

  • command (Request)

    command we want to schedule



59
60
61
62
63
64
65
66
67
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 59

def <<(command)
  key = "#{command[:consumer_group_id]}:#{command[:topic]}:#{command[:partition_id]}"
  index_key = "#{command[:consumer_group_id]}:#{command[:topic]}"

  @mutex.synchronize do
    @requests[key] << command
    @partition_index[index_key] << command[:partition_id]
  end
end

#clear!Object

Note:

Primarily for testing purposes

Clears all stored requests and partition index



106
107
108
109
110
111
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 106

def clear!
  @mutex.synchronize do
    @requests.clear
    @partition_index.clear
  end
end

#each_for(consumer_group_id, topic, partition_id) {|given| ... } ⇒ Object

Selects all incoming command requests for given consumer group, topic, and partition and iterates over them. It removes selected requests during iteration.

Parameters:

  • consumer_group_id (String)
  • topic (String)
  • partition_id (Integer)

Yield Parameters:

  • given (Request)

    command request



77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 77

def each_for(consumer_group_id, topic, partition_id, &)
  key = "#{consumer_group_id}:#{topic}:#{partition_id}"
  index_key = "#{consumer_group_id}:#{topic}"
  requests = nil

  @mutex.synchronize do
    requests = @requests.delete(key)
    @partition_index[index_key].delete(partition_id) if requests
  end

  (requests || EMPTY_ARRAY).each(&)
end

#partition_ids_for(consumer_group_id, topic) ⇒ Array<Integer>

Returns partition IDs that have pending commands for the given consumer group and topic

Parameters:

  • consumer_group_id (String)
  • topic (String)

Returns:

  • (Array<Integer>)

    partition IDs with pending commands



96
97
98
99
100
101
102
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 96

def partition_ids_for(consumer_group_id, topic)
  index_key = "#{consumer_group_id}:#{topic}"

  @mutex.synchronize do
    @partition_index[index_key].to_a
  end
end