Class: Karafka::Web::Pro::Commanding::Handlers::Topics::Commands::Pause

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/web/pro/commanding/handlers/topics/commands/pause.rb

Overview

Executes the topic-level pause request to pause all partitions of a topic that are assigned to this consumer process within the target consumer group.

Instance Method Summary collapse

Methods inherited from Base

#initialize

Constructor Details

This class inherits a constructor from Karafka::Web::Pro::Commanding::Handlers::Topics::Commands::Base

Instance Method Details

#callObject

Triggers pausing of all partitions for the target topic that are owned by this process. Supports prevent_override to skip already paused partitions.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/karafka/web/pro/commanding/handlers/topics/commands/pause.rb', line 48

def call
  partitions_affected = []
  partitions_prevented = []

  duration = request[:duration]
  duration = FOREVER_MS if duration.zero?
  prevent_override = request[:prevent_override]

  owned_partition_ids.each do |partition_id|
    coordinator = coordinator_for(partition_id)

    # If prevent_override is set and partition is already paused, skip it
    if coordinator.pause_tracker.paused? && prevent_override
      partitions_prevented << partition_id
      next
    end

    coordinator.pause_tracker.pause(duration)
    client.pause(topic, partition_id, nil, duration)

    partitions_affected << partition_id
  end

  result(
    "applied",
    partitions_affected: partitions_affected,
    partitions_prevented: partitions_prevented
  )
end