Class: Karafka::Web::Pro::Ui::Controllers::Consumers::Partitions::OffsetsController

Inherits:
BaseController show all
Defined in:
lib/karafka/web/pro/ui/controllers/consumers/partitions/offsets_controller.rb

Overview

Partition offset management controller at the consumer group level

Constant Summary

Constants inherited from Ui::Controllers::BaseController

Ui::Controllers::BaseController::Models

Instance Attribute Summary

Attributes inherited from Ui::Controllers::BaseController

#params, #session

Instance Method Summary collapse

Methods inherited from ConsumersController

#details, #index, #performance, #subscriptions

Methods inherited from Ui::Controllers::BaseController

#cache, #initialize

Methods included from Ui::Controllers::Requests::Hookable

included, #run_after_hooks, #run_before_hooks

Constructor Details

This class inherits a constructor from Karafka::Web::Ui::Controllers::BaseController

Instance Method Details

#edit(consumer_group_id, topic, partition_id) ⇒ Object

Displays the offset edit page with the edit form or a warning when not applicable

Parameters:

  • consumer_group_id (String)
  • topic (String)
  • partition_id (Integer)


47
48
49
50
51
# File 'lib/karafka/web/pro/ui/controllers/consumers/partitions/offsets_controller.rb', line 47

def edit(consumer_group_id, topic, partition_id)
  bootstrap!(consumer_group_id, topic, partition_id)

  render
end

#update(consumer_group_id, topic, partition_id) ⇒ Object

Triggers the offset change in the running process via the commanding API

Parameters:

  • consumer_group_id (String)
  • topic (String)
  • partition_id (Integer)


58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/karafka/web/pro/ui/controllers/consumers/partitions/offsets_controller.rb', line 58

def update(consumer_group_id, topic, partition_id)
  edit(consumer_group_id, topic, partition_id)

  offset = params.int(:offset)
  prevent_overtaking = params.bool(:prevent_overtaking)
  force_resume = params.bool(:force_resume)

  # Broadcast to all processes with matchers to filter by consumer group,
  # topic, and partition
  Commanding::Dispatcher.request(
    Commanding::Commands::Partitions::Seek.name,
    {
      consumer_group_id: consumer_group_id,
      topic: topic,
      partition_id: partition_id,
      offset: offset,
      prevent_overtaking: prevent_overtaking,
      force_resume: force_resume
    },
    matchers: {
      consumer_group_id: consumer_group_id,
      topic: topic,
      partition_id: partition_id
    }
  )

  redirect(
    :previous,
    success: format_flash(
      "Initiated offset adjustment to ? for ?#? in consumer group ?",
      offset,
      topic,
      partition_id,
      consumer_group_id
    )
  )
end