Class: Karafka::Web::Pro::Commanding::Handlers::Topics::Commands::Resume

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/web/pro/commanding/handlers/topics/commands/resume.rb

Overview

Resumes all paused partitions for a topic that are assigned to this consumer process within the target consumer group.

Instance Method Summary collapse

Methods inherited from Base

#initialize

Constructor Details

This class inherits a constructor from Karafka::Web::Pro::Commanding::Handlers::Topics::Commands::Base

Instance Method Details

#callObject

Expires the pause on all partitions for the target topic so Karafka resumes processing of those partitions.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/karafka/web/pro/commanding/handlers/topics/commands/resume.rb', line 43

def call
  partitions_affected = []

  owned_partition_ids.each do |partition_id|
    coordinator = coordinator_for(partition_id)

    coordinator.pause_tracker.expire
    coordinator.pause_tracker.reset if request[:reset_attempts]

    partitions_affected << partition_id
  end

  result("applied", partitions_affected: partitions_affected)
end