Class: Karafka::Web::Pro::Ui::Controllers::Consumers::Topics::PausesController

Inherits:
BaseController show all
Defined in:
lib/karafka/web/pro/ui/controllers/consumers/topics/pauses_controller.rb

Overview

Controller for managing topic-level pauses at the consumer group level.

Constant Summary

Constants inherited from Ui::Controllers::BaseController

Ui::Controllers::BaseController::Models

Instance Attribute Summary

Attributes inherited from Ui::Controllers::BaseController

#params, #session

Instance Method Summary collapse

Methods inherited from Ui::Controllers::BaseController

#cache, #initialize

Methods included from Ui::Controllers::Requests::Hookable

included, #run_after_hooks, #run_before_hooks

Constructor Details

This class inherits a constructor from Karafka::Web::Ui::Controllers::BaseController

Instance Method Details

#create(consumer_group_id, topic) ⇒ Object

Dispatches the topic pause command to all processes

Parameters:

  • consumer_group_id (String)
  • topic (String)


57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/karafka/web/pro/ui/controllers/consumers/topics/pauses_controller.rb', line 57

def create(consumer_group_id, topic)
  new(consumer_group_id, topic)

  Commanding::Dispatcher.request(
    Commanding::Commands::Topics::Pause.name,
    {
      consumer_group_id: consumer_group_id,
      topic: topic,
      # User provides this in seconds, we operate on ms in the system
      duration: params.int(:duration) * 1_000,
      prevent_override: params.bool(:prevent_override)
    },
    matchers: {
      consumer_group_id: consumer_group_id,
      topic: topic
    }
  )

  redirect(
    :previous,
    success: format_flash(
      "Initiated pause for all partitions of the ? topic in consumer group ?",
      topic,
      consumer_group_id
    )
  )
end

#delete(consumer_group_id, topic) ⇒ Object

Dispatches the topic resume command to all processes

Parameters:

  • consumer_group_id (String)
  • topic (String)


99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/karafka/web/pro/ui/controllers/consumers/topics/pauses_controller.rb', line 99

def delete(consumer_group_id, topic)
  new(consumer_group_id, topic)

  Commanding::Dispatcher.request(
    Commanding::Commands::Topics::Resume.name,
    {
      consumer_group_id: consumer_group_id,
      topic: topic,
      reset_attempts: params.bool(:reset_attempts)
    },
    matchers: {
      consumer_group_id: consumer_group_id,
      topic: topic
    }
  )

  redirect(
    :previous,
    success: format_flash(
      "Initiated resume for all partitions of the ? topic in consumer group ?",
      topic,
      consumer_group_id
    )
  )
end

#edit(consumer_group_id, topic) ⇒ Object

Displays the resume configuration form for a topic

Parameters:

  • consumer_group_id (String)
  • topic (String)


89
90
91
92
93
# File 'lib/karafka/web/pro/ui/controllers/consumers/topics/pauses_controller.rb', line 89

def edit(consumer_group_id, topic)
  new(consumer_group_id, topic)

  render
end

#new(consumer_group_id, topic) ⇒ Object

Displays the pause configuration form for a topic

Parameters:

  • consumer_group_id (String)
  • topic (String)


47
48
49
50
51
# File 'lib/karafka/web/pro/ui/controllers/consumers/topics/pauses_controller.rb', line 47

def new(consumer_group_id, topic)
  bootstrap!(consumer_group_id, topic)

  render
end