Module: Karafka::Routing::Features::Declaratives::Topic

Defined in:
lib/karafka/routing/features/declaratives/topic.rb

Overview

Bridge module prepended onto Karafka::Routing::Topic. The config(…) method forwards to the Declaratives subsystem, creating or retrieving a Karafka::Declaratives::Topic in the repository. This preserves backwards compatibility while the actual declarative state lives in Karafka::Declaratives.

Instance Method Summary collapse

Instance Method Details

#config(active: true, partitions: 1, replication_factor: 1, **details) ⇒ Karafka::Declaratives::Topic

Bridge: creates/retrieves a Declaratives::Topic in the repository and returns it. Preserves the ||= semantics (first call wins) for backwards compatibility.

Parameters:

  • active (Boolean) (defaults to: true)

    is the topic structure management feature active

  • partitions (Integer) (defaults to: 1)

    number of partitions for the topic

  • replication_factor (Integer) (defaults to: 1)

    replication factor for the topic

  • details (Hash)

    extra configuration for the topic

Options Hash (**details):

  • :retention.ms (Symbol)

    retention time in milliseconds

  • :compression.type (Symbol)

    compression type (none, gzip, snappy, lz4, zstd)

Returns:



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/karafka/routing/features/declaratives/topic.rb', line 32

def config(active: true, partitions: 1, replication_factor: 1, **details)
  @declaratives ||= begin
    repo = Karafka::App.declaratives.repository

    repo.find(name) || begin
      declaration = repo.find_or_create(name)
      declaration.active(active)
      declaration.partitions(partitions)
      declaration.replication_factor(replication_factor)
      declaration.config(details) unless details.empty?
      declaration
    end
  end.tap do |declaration|
    # Set bootstrap servers from the routing topic's kafka config so the CLI can
    # filter by cluster. This runs on every call but is idempotent.
    declaration.bootstrap_servers ||= kafka[:"bootstrap.servers"]
  end
end

#declarativesKarafka::Declaratives::Topic

Returns config details.

Returns:



52
53
54
# File 'lib/karafka/routing/features/declaratives/topic.rb', line 52

def declaratives
  config
end

#declaratives?true

Returns declaratives is always active.

Returns:

  • (true)

    declaratives is always active



57
58
59
# File 'lib/karafka/routing/features/declaratives/topic.rb', line 57

def declaratives?
  declaratives.active?
end

#initializeObject

This method sets up the extra instance variable to nil before calling the parent class initializer. The explicit initialization to nil is included as an optimization for Ruby’s object shapes system, which improves memory layout and access performance.



16
17
18
19
# File 'lib/karafka/routing/features/declaratives/topic.rb', line 16

def initialize(...)
  @declaratives = nil
  super
end

#to_hHash

Returns topic with all its native configuration options plus declaratives settings.

Returns:

  • (Hash)

    topic with all its native configuration options plus declaratives settings



63
64
65
66
67
# File 'lib/karafka/routing/features/declaratives/topic.rb', line 63

def to_h
  super.merge(
    declaratives: declaratives.to_h
  ).freeze
end