Class: Karafka::Web::Pro::Commanding::Handlers::Topics::Commands::Pause
- Inherits:
-
Base
- Object
- Base
- Karafka::Web::Pro::Commanding::Handlers::Topics::Commands::Pause
- Defined in:
- lib/karafka/web/pro/commanding/handlers/topics/commands/pause.rb
Overview
Executes the topic-level pause request to pause all partitions of a topic that are assigned to this consumer process within the target consumer group.
Instance Method Summary collapse
-
#call ⇒ Object
Triggers pausing of all partitions for the target topic that are owned by this process.
Methods inherited from Base
Constructor Details
This class inherits a constructor from Karafka::Web::Pro::Commanding::Handlers::Topics::Commands::Base
Instance Method Details
#call ⇒ Object
Triggers pausing of all partitions for the target topic that are owned by this process. Supports prevent_override to skip already paused partitions.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/karafka/web/pro/commanding/handlers/topics/commands/pause.rb', line 48 def call partitions_affected = [] partitions_prevented = [] duration = request[:duration] duration = FOREVER_MS if duration.zero? prevent_override = request[:prevent_override] owned_partition_ids.each do |partition_id| coordinator = coordinator_for(partition_id) # If prevent_override is set and partition is already paused, skip it if coordinator.pause_tracker.paused? && prevent_override partitions_prevented << partition_id next end coordinator.pause_tracker.pause(duration) client.pause(topic, partition_id, nil, duration) partitions_affected << partition_id end result( "applied", partitions_affected: partitions_affected, partitions_prevented: partitions_prevented ) end |