Class: Karafka::Declaratives::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/declaratives/topic.rb

Overview

Represents a single declarative topic definition - what a topic should look like on the broker. This is a standalone object, independent of routing concepts like consumers or subscription groups.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ Topic

Returns a new instance of Topic.

Parameters:

  • name (String, Symbol)

    topic name



18
19
20
21
22
23
24
25
# File 'lib/karafka/declaratives/topic.rb', line 18

def initialize(name)
  @name = name.to_s
  @active = true
  @partitions = 1
  @replication_factor = 1
  @details = {}
  @bootstrap_servers = nil
end

Instance Attribute Details

#active(value = :not_set) ⇒ Boolean

Gets or sets the active flag.

The active flag exists because the routing bridge (Routing::Features::Declaratives::Topic) auto-creates a declaration for every routing topic that calls config(). Some routing topics - notably Pro pattern-matched virtual topics - must exist in routing but cannot be managed declaratively (their real Kafka topic names are unknown). Those call config(active: false) to opt out. Once the routing bridge is retired and declaratives are defined exclusively via Karafka::App.declaratives.draw, this flag becomes unnecessary: topics simply won’t be declared if they shouldn’t be managed.

Parameters:

  • value (Symbol, Boolean) (defaults to: :not_set)

    when :not_set returns current value, otherwise sets it

Returns:

  • (Boolean)

    active state



48
49
50
51
52
53
54
# File 'lib/karafka/declaratives/topic.rb', line 48

def active(value = :not_set)
  if value == :not_set
    @active
  else
    @active = value
  end
end

#bootstrap_serversObject

Bootstrap servers for the Kafka cluster this topic belongs to. Set by the routing bridge from the routing topic’s kafka config. Topics declared via the standalone DSL leave this nil (treated as default cluster).



15
16
17
# File 'lib/karafka/declaratives/topic.rb', line 15

def bootstrap_servers
  @bootstrap_servers
end

#detailsObject

Returns the value of attribute details.



9
10
11
# File 'lib/karafka/declaratives/topic.rb', line 9

def details
  @details
end

#nameObject (readonly)

Returns the value of attribute name.



9
10
11
# File 'lib/karafka/declaratives/topic.rb', line 9

def name
  @name
end

#partitions(value = :not_set) ⇒ Integer

Gets or sets the partition count

Parameters:

  • value (Symbol, Integer) (defaults to: :not_set)

    when :not_set returns current value, otherwise sets it

Returns:

  • (Integer)

    partition count



64
65
66
67
68
69
70
# File 'lib/karafka/declaratives/topic.rb', line 64

def partitions(value = :not_set)
  if value == :not_set
    @partitions
  else
    @partitions = value
  end
end

#replication_factor(value = :not_set) ⇒ Integer

Gets or sets the replication factor

Parameters:

  • value (Symbol, Integer) (defaults to: :not_set)

    when :not_set returns current value, otherwise sets it

Returns:

  • (Integer)

    replication factor



75
76
77
78
79
80
81
# File 'lib/karafka/declaratives/topic.rb', line 75

def replication_factor(value = :not_set)
  if value == :not_set
    @replication_factor
  else
    @replication_factor = value
  end
end

Instance Method Details

#active?Boolean

Returns is this topic actively managed via declaratives.

Returns:

  • (Boolean)

    is this topic actively managed via declaratives



57
58
59
# File 'lib/karafka/declaratives/topic.rb', line 57

def active?
  @active
end

#config(entries = {}) ⇒ Object

Merges Kafka topic configuration entries into the details hash

Parameters:

  • entries (Hash) (defaults to: {})

    topic config entries like ‘retention.ms’ => 604_800_000



85
86
87
# File 'lib/karafka/declaratives/topic.rb', line 85

def config(entries = {})
  @details.merge!(entries.transform_keys(&:to_sym))
end

#declarativesKarafka::Declaratives::Topic

Self-accessor for backwards compatibility. CLI commands that previously accessed topic.declaratives.partitions on routing topics can now call the same on Declaratives::Topic instances directly.

Returns:



32
33
34
# File 'lib/karafka/declaratives/topic.rb', line 32

def declaratives
  self
end

#to_hHash

Returns hash representation matching the old Config struct’s to_h output.

Returns:

  • (Hash)

    hash representation matching the old Config struct’s to_h output



90
91
92
93
94
95
96
97
# File 'lib/karafka/declaratives/topic.rb', line 90

def to_h
  {
    active: @active,
    partitions: @partitions,
    replication_factor: @replication_factor,
    details: @details
  }
end