Module: Karafka::Pro::Routing::Features::ConsumerGroups::ParallelSegments::ConsumerGroup

Defined in:
lib/karafka/pro/routing/features/consumer_groups/parallel_segments/consumer_group.rb

Overview

Parallel segments are defined on the consumer group (since it creates many), thus we define them on the consumer group. This module adds extra methods needed there to make it work

Instance Method Summary collapse

Instance Method Details

#parallel_segmentsConfig

Returns parallel segments config.

Returns:

  • (Config)

    parallel segments config



42
43
44
45
# File 'lib/karafka/pro/routing/features/consumer_groups/parallel_segments/consumer_group.rb', line 42

def parallel_segments
  # We initialize it as disabled if not configured by the user
  public_send(:parallel_segments=, count: 1)
end

#parallel_segments=(count: 1, partitioner: nil, reducer: nil, merge_key: "-parallel-") ⇒ Object

Note:

This method is an assignor but the API is actually via the ‘#parallel_segments` method. Our `Routing::Proxy` normalizes that the way we want to have it exposed for the end users.

Allows setting parallel segments configuration

Parameters:

  • count (Integer) (defaults to: 1)

    number of parallel segments (number of parallel consumer groups that will be created)

  • partitioner (nil, #call) (defaults to: nil)

    nil or callable partitioner

  • reducer (nil, #call) (defaults to: nil)

    reducer for parallel key. It allows for using a custom reducer to achieve enhanced parallelization when the default reducer is not enough.

  • merge_key (String) (defaults to: "-parallel-")

    key used to build the parallel segment consumer groups



59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/karafka/pro/routing/features/consumer_groups/parallel_segments/consumer_group.rb', line 59

def parallel_segments=(
  count: 1,
  partitioner: nil,
  reducer: nil,
  merge_key: "-parallel-"
)
  @parallel_segments ||= Config.new(
    active: count > 1,
    count: count,
    partitioner: partitioner,
    reducer: reducer || ->(parallel_key) { parallel_key.to_s.sum % count },
    merge_key: merge_key
  )
end

#parallel_segments?Boolean

Returns are parallel segments active.

Returns:

  • (Boolean)

    are parallel segments active



75
76
77
# File 'lib/karafka/pro/routing/features/consumer_groups/parallel_segments/consumer_group.rb', line 75

def parallel_segments?
  parallel_segments.active?
end

#segment_idInteger

Returns id of the segment (0 or bigger) or -1 if parallel segments are not active.

Returns:

  • (Integer)

    id of the segment (0 or bigger) or -1 if parallel segments are not active



81
82
83
84
85
86
87
88
89
# File 'lib/karafka/pro/routing/features/consumer_groups/parallel_segments/consumer_group.rb', line 81

def segment_id
  return @segment_id if @segment_id

  @segment_id = if parallel_segments?
    name.split(parallel_segments.merge_key).last.to_i
  else
    -1
  end
end

#segment_originString

Returns original segment consumer group name.

Returns:

  • (String)

    original segment consumer group name



92
93
94
# File 'lib/karafka/pro/routing/features/consumer_groups/parallel_segments/consumer_group.rb', line 92

def segment_origin
  name.split(parallel_segments.merge_key).first
end

#to_hHash

Returns consumer group setup with the parallel segments definition in it.

Returns:

  • (Hash)

    consumer group setup with the parallel segments definition in it



97
98
99
100
101
102
103
# File 'lib/karafka/pro/routing/features/consumer_groups/parallel_segments/consumer_group.rb', line 97

def to_h
  super.merge(
    parallel_segments: parallel_segments.to_h.merge(
      segment_id: segment_id
    )
  ).freeze
end