Class: Karafka::Web::Pro::Commanding::Handlers::Partitions::Tracker
- Inherits:
-
Object
- Object
- Karafka::Web::Pro::Commanding::Handlers::Partitions::Tracker
- Includes:
- Singleton
- Defined in:
- lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb
Overview
Tracker used to record incoming partition related operational requests until they are executable or invalid. It stores the requests as they come for execution pre-polling.
Instance Method Summary collapse
-
#<<(command) ⇒ Object
Adds the given command into the tracker so it can be retrieved when needed.
-
#clear! ⇒ Object
Clears all stored requests and partition index.
-
#each_for(consumer_group_id, topic, partition_id) {|given| ... } ⇒ Object
Selects all incoming command requests for given consumer group, topic, and partition and iterates over them.
-
#initialize ⇒ Tracker
constructor
A new instance of Tracker.
-
#partition_ids_for(consumer_group_id, topic) ⇒ Array<Integer>
Returns partition IDs that have pending commands for the given consumer group and topic.
Constructor Details
#initialize ⇒ Tracker
Returns a new instance of Tracker.
47 48 49 50 51 52 |
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 47 def initialize @mutex = Mutex.new @requests = Hash.new { |h, k| h[k] = [] } # Index tracking which partitions have pending commands per consumer_group:topic @partition_index = Hash.new { |h, k| h[k] = Set.new } end |
Instance Method Details
#<<(command) ⇒ Object
Commands are indexed by consumer_group_id:topic:partition_id combination since partition commands are dispatched without subscription_group_id.
Adds the given command into the tracker so it can be retrieved when needed.
59 60 61 62 63 64 65 66 67 |
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 59 def <<(command) key = "#{command[:consumer_group_id]}:#{command[:topic]}:#{command[:partition_id]}" index_key = "#{command[:consumer_group_id]}:#{command[:topic]}" @mutex.synchronize do @requests[key] << command @partition_index[index_key] << command[:partition_id] end end |
#clear! ⇒ Object
Primarily for testing purposes
Clears all stored requests and partition index
106 107 108 109 110 111 |
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 106 def clear! @mutex.synchronize do @requests.clear @partition_index.clear end end |
#each_for(consumer_group_id, topic, partition_id) {|given| ... } ⇒ Object
Selects all incoming command requests for given consumer group, topic, and partition and iterates over them. It removes selected requests during iteration.
77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 77 def each_for(consumer_group_id, topic, partition_id, &) key = "#{consumer_group_id}:#{topic}:#{partition_id}" index_key = "#{consumer_group_id}:#{topic}" requests = nil @mutex.synchronize do requests = @requests.delete(key) @partition_index[index_key].delete(partition_id) if requests end (requests || EMPTY_ARRAY).each(&) end |
#partition_ids_for(consumer_group_id, topic) ⇒ Array<Integer>
Returns partition IDs that have pending commands for the given consumer group and topic
96 97 98 99 100 101 102 |
# File 'lib/karafka/web/pro/commanding/handlers/partitions/tracker.rb', line 96 def partition_ids_for(consumer_group_id, topic) index_key = "#{consumer_group_id}:#{topic}" @mutex.synchronize do @partition_index[index_key].to_a end end |