Class: Karafka::Web::Pro::Commanding::Handlers::Topics::Tracker

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/karafka/web/pro/commanding/handlers/topics/tracker.rb

Overview

Tracker used to record incoming topic related operational requests until they are executable or invalid. It stores the requests as they come for execution pre-polling.

Instance Method Summary collapse

Constructor Details

#initializeTracker

Returns a new instance of Tracker.



48
49
50
51
# File 'lib/karafka/web/pro/commanding/handlers/topics/tracker.rb', line 48

def initialize
  @mutex = Mutex.new
  @requests = Hash.new { |h, k| h[k] = [] }
end

Instance Method Details

#<<(command) ⇒ Object

Note:

Commands are indexed by consumer_group_id:topic combination since topic commands are dispatched without subscription_group_id.

Adds the given command into the tracker so it can be retrieved when needed.

Parameters:

  • command (Request)

    command we want to schedule



58
59
60
61
62
63
64
# File 'lib/karafka/web/pro/commanding/handlers/topics/tracker.rb', line 58

def <<(command)
  key = "#{command[:consumer_group_id]}:#{command[:topic]}"

  @mutex.synchronize do
    @requests[key] << command
  end
end

#each_for(consumer_group_id, topic) {|given| ... } ⇒ Object

Selects all incoming command requests that match the given consumer group and topic and iterates over them. It removes selected requests during iteration.

Parameters:

  • consumer_group_id (String)
  • topic (String)

Yield Parameters:

  • given (Request)

    command request



73
74
75
76
77
78
79
80
81
82
# File 'lib/karafka/web/pro/commanding/handlers/topics/tracker.rb', line 73

def each_for(consumer_group_id, topic, &)
  key = "#{consumer_group_id}:#{topic}"
  requests = nil

  @mutex.synchronize do
    requests = @requests.delete(key)
  end

  (requests || EMPTY_ARRAY).each(&)
end