Module: Karafka::Pro::Routing::Features::ParallelSegments::ConsumerGroup

Defined in:
lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb

Overview

Parallel segments are defined on the consumer group (since it creates many), thus we define them on the consumer group. This module adds extra methods needed there to make it work

Instance Method Summary collapse

Instance Method Details

#parallel_segmentsConfig

Returns parallel segments config.

Returns:

  • (Config)

    parallel segments config



41
42
43
44
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 41

def parallel_segments
  # We initialize it as disabled if not configured by the user
  public_send(:parallel_segments=, count: 1)
end

#parallel_segments=(count: 1, partitioner: nil, reducer: nil, merge_key: "-parallel-") ⇒ Object

Note:

This method is an assignor but the API is actually via the ‘#parallel_segments` method. Our `Routing::Proxy` normalizes that the way we want to have it exposed for the end users.

Allows setting parallel segments configuration

Parameters:

  • count (Integer) (defaults to: 1)

    number of parallel segments (number of parallel consumer groups that will be created)

  • partitioner (nil, #call) (defaults to: nil)

    nil or callable partitioner

  • reducer (nil, #call) (defaults to: nil)

    reducer for parallel key. It allows for using a custom reducer to achieve enhanced parallelization when the default reducer is not enough.

  • merge_key (String) (defaults to: "-parallel-")

    key used to build the parallel segment consumer groups



58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 58

def parallel_segments=(
  count: 1,
  partitioner: nil,
  reducer: nil,
  merge_key: "-parallel-"
)
  @parallel_segments ||= Config.new(
    active: count > 1,
    count: count,
    partitioner: partitioner,
    reducer: reducer || ->(parallel_key) { parallel_key.to_s.sum % count },
    merge_key: merge_key
  )
end

#parallel_segments?Boolean

Returns are parallel segments active.

Returns:

  • (Boolean)

    are parallel segments active



74
75
76
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 74

def parallel_segments?
  parallel_segments.active?
end

#segment_idInteger

Returns id of the segment (0 or bigger) or -1 if parallel segments are not active.

Returns:

  • (Integer)

    id of the segment (0 or bigger) or -1 if parallel segments are not active



80
81
82
83
84
85
86
87
88
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 80

def segment_id
  return @segment_id if @segment_id

  @segment_id = if parallel_segments?
    name.split(parallel_segments.merge_key).last.to_i
  else
    -1
  end
end

#segment_originString

Returns original segment consumer group name.

Returns:

  • (String)

    original segment consumer group name



91
92
93
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 91

def segment_origin
  name.split(parallel_segments.merge_key).first
end

#to_hHash

Returns consumer group setup with the parallel segments definition in it.

Returns:

  • (Hash)

    consumer group setup with the parallel segments definition in it



96
97
98
99
100
101
102
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 96

def to_h
  super.merge(
    parallel_segments: parallel_segments.to_h.merge(
      segment_id: segment_id
    )
  ).freeze
end