Class: Karafka::Web::Pro::Commanding::Handlers::Topics::Tracker
- Inherits:
-
Object
- Object
- Karafka::Web::Pro::Commanding::Handlers::Topics::Tracker
- Includes:
- Singleton
- Defined in:
- lib/karafka/web/pro/commanding/handlers/topics/tracker.rb
Overview
Tracker used to record incoming topic related operational requests until they are executable or invalid. It stores the requests as they come for execution pre-polling.
Instance Method Summary collapse
-
#<<(command) ⇒ Object
Adds the given command into the tracker so it can be retrieved when needed.
-
#each_for(consumer_group_id, topic) {|given| ... } ⇒ Object
Selects all incoming command requests that match the given consumer group and topic and iterates over them.
-
#initialize ⇒ Tracker
constructor
A new instance of Tracker.
Constructor Details
#initialize ⇒ Tracker
Returns a new instance of Tracker.
48 49 50 51 |
# File 'lib/karafka/web/pro/commanding/handlers/topics/tracker.rb', line 48 def initialize @mutex = Mutex.new @requests = Hash.new { |h, k| h[k] = [] } end |
Instance Method Details
#<<(command) ⇒ Object
Commands are indexed by consumer_group_id:topic combination since topic commands are dispatched without subscription_group_id.
Adds the given command into the tracker so it can be retrieved when needed.
58 59 60 61 62 63 64 |
# File 'lib/karafka/web/pro/commanding/handlers/topics/tracker.rb', line 58 def <<(command) key = "#{command[:consumer_group_id]}:#{command[:topic]}" @mutex.synchronize do @requests[key] << command end end |
#each_for(consumer_group_id, topic) {|given| ... } ⇒ Object
Selects all incoming command requests that match the given consumer group and topic and iterates over them. It removes selected requests during iteration.
73 74 75 76 77 78 79 80 81 82 |
# File 'lib/karafka/web/pro/commanding/handlers/topics/tracker.rb', line 73 def each_for(consumer_group_id, topic, &) key = "#{consumer_group_id}:#{topic}" requests = nil @mutex.synchronize do requests = @requests.delete(key) end (requests || EMPTY_ARRAY).each(&) end |