Module: Karafka::Testing::Minitest::Helpers

Defined in:
lib/karafka/testing/minitest/helpers.rb

Overview

Minitest helpers module that needs to be included

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object

Adds all the needed extra functionalities to the minitest group

Parameters:

  • base (Class)

    Minitest example group we want to extend



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/karafka/testing/minitest/helpers.rb', line 28

def included(base)
  eval_flow = lambda do
    Karafka::Testing.ensure_karafka_initialized!

    @karafka = Karafka::Testing::Minitest::Proxy.new(self)
    @_karafka_consumer_messages = []
    @_karafka_consumer_client = Karafka::Testing::SpecConsumerClient.new
    @_karafka_producer_client = Karafka::Testing::SpecProducerClient.new(self)

    @_karafka_consumer_messages.clear
    @_karafka_producer_client.reset

    @_karafka_consumer_mappings = {}

    Karafka.producer.stubs(:client).returns(@_karafka_producer_client)
  end

  if base.to_s == "Minitest::Spec"
    base.class_eval do
      before(&eval_flow)
    end
  elsif base.respond_to?(:setup) && base.method(:setup).arity != 0
    base.class_eval do
      setup(&eval_flow)
    end
  else
    setup_mod = Module.new do
      define_method(:setup) do
        instance_exec(&eval_flow)
        super()
      end
    end

    base.prepend(setup_mod)
  end
end

Instance Method Details

#_karafka_add_message_to_consumer_if_needed(message) ⇒ Object

Adds a new Karafka message instance if needed with given payload and options into an internal consumer buffer that will be used to simulate messages delivery to the consumer

Examples:

Send a json message to consumer

@karafka.produce({ 'hello' => 'world' }.to_json)

Send a json message to consumer and simulate, that it is partition 6

@karafka.produce({ 'hello' => 'world' }.to_json, 'partition' => 6)

Parameters:

  • message (Hash)

    message that was sent to Kafka



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/karafka/testing/minitest/helpers.rb', line 100

def _karafka_add_message_to_consumer_if_needed(message)
  consumer_obj = if message[:consumer_group]
    _karafka_find_consumer_for_message(message)
  elsif defined?(@consumer)
    @consumer
  else
    _karafka_find_consumer_for_message(message)
  end
  # Consumer needs to be defined in order to pass messages to it
  return unless defined?(consumer_obj)
  # We're interested in adding message to consumer only when it is a Karafka consumer
  # Users may want to test other things (models producing messages for example) and in
  # their case consumer will not be a consumer
  return unless consumer_obj.is_a?(Karafka::BaseConsumer)
  # We target to the consumer only messages that were produced to it, since specs may also
  # produce other messages targeting other topics
  return unless message[:topic] == consumer_obj.topic.name
  # If consumer_group is explicitly specified, verify it matches
  return if message[:consumer_group] &&
    message[:consumer_group].to_s != consumer_obj.topic.consumer_group.name

  # Build message metadata and copy any metadata that would come from the message
   = (consumer_obj)

  .keys.each do |key|
    message_key = METADATA_DISPATCH_MAPPINGS.fetch(key, key)

    next unless message.key?(message_key)

    [key] = message.fetch(message_key)
  end

  # Add this message to previously produced messages
  @_karafka_consumer_messages << Karafka::Messages::Message.new(
    message[:payload],
    Karafka::Messages::Metadata.new()
  )
  # Update batch metadata
   = Karafka::Messages::Builders::BatchMetadata.call(
    @_karafka_consumer_messages,
    consumer_obj.topic,
    0,
    Time.now
  )

  # Update consumer messages batch
  consumer_obj.messages = Karafka::Messages::Messages.new(
    @_karafka_consumer_messages,
    
  )
end

#_karafka_consumer_for(requested_topic, requested_consumer_group = nil) ⇒ Object

Creates a consumer instance for a given topic

Examples:

Creates a consumer instance with settings for ‘my_requested_topic`

consumer = @karafka.consumer_for(:my_requested_topic)

Parameters:

  • requested_topic (String, Symbol)

    name of the topic for which we want to create a consumer instance

  • requested_consumer_group (String, Symbol, nil) (defaults to: nil)

    optional name of the consumer group if we have multiple consumer groups listening on the same topic

Returns:

  • (Object)

    Karafka consumer instance

Raises:



78
79
80
81
82
83
84
85
86
87
88
# File 'lib/karafka/testing/minitest/helpers.rb', line 78

def _karafka_consumer_for(requested_topic, requested_consumer_group = nil)
  selected_topics = Testing::Helpers.karafka_consumer_find_candidate_topics(
    requested_topic.to_s,
    requested_consumer_group.to_s
  )

  raise Errors::TopicInManyConsumerGroupsError, requested_topic if selected_topics.size > 1
  raise Errors::TopicNotFoundError, requested_topic if selected_topics.empty?

  _karafka_build_consumer_for(selected_topics.first)
end

#_karafka_consumer_messagesArray<Karafka::Messages::Message>

Returns array of messages that will be used to construct the final consumer messages batch.

Returns:

  • (Array<Karafka::Messages::Message>)

    array of messages that will be used to construct the final consumer messages batch



187
188
189
# File 'lib/karafka/testing/minitest/helpers.rb', line 187

def _karafka_consumer_messages
  @_karafka_consumer_messages
end

#_karafka_produce(payload, metadata = {}) ⇒ Object

Produces message with a given payload to the consumer matching topic

Parameters:

  • payload (String)

    payload we want to dispatch

  • metadata (Hash) (defaults to: {})

    any metadata we want to dispatch alongside the payload. Supports an ‘offset` key to set a custom offset for the message (otherwise offsets auto-increment from 0).



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/karafka/testing/minitest/helpers.rb', line 157

def _karafka_produce(payload,  = {})
  # Extract offset before passing to WaterDrop since it is not a valid
  # WaterDrop message attribute (Kafka assigns offsets, not producers)
  @_karafka_next_offset = .delete(:offset)

  topic = if [:topic]
    [:topic]
  elsif defined?(@consumer)
    @consumer.topic.name
  else
    last_consumer = @_karafka_consumer_mappings&.values&.last
    last_consumer&.topic&.name
  end
  Karafka.producer.produce_sync(
    {
      topic: topic,
      payload: payload
    }.merge()
  )
ensure
  @_karafka_next_offset = nil
end

#_karafka_produce_to(consumer_instance, payload, metadata = {}) ⇒ Object

Produces message to a specific consumer instance Use when testing multiple consumers for the same topic

Examples:

Produce to specific consumer when multiple exist for same topic

consumer1 = @karafka.consumer_for(:events, :analytics_group)
consumer2 = @karafka.consumer_for(:events, :notifications_group)
@karafka.produce_to(consumer1, { 'event' => 'click' }.to_json)

Parameters:

  • consumer_instance (Object)

    the consumer to produce to

  • payload (String)

    message content (usually serialized JSON) to deliver to the consumer

  • metadata (Hash) (defaults to: {})

    any metadata to dispatch alongside the payload



203
204
205
206
207
208
209
210
211
# File 'lib/karafka/testing/minitest/helpers.rb', line 203

def _karafka_produce_to(consumer_instance, payload,  = {})
  _karafka_produce(
    payload,
    .merge(
      topic: consumer_instance.topic.name,
      consumer_group: consumer_instance.topic.consumer_group.name
    )
  )
end

#_karafka_produced_messagesArray<Hash>

Returns messages that were produced.

Returns:

  • (Array<Hash>)

    messages that were produced



181
182
183
# File 'lib/karafka/testing/minitest/helpers.rb', line 181

def _karafka_produced_messages
  @_karafka_producer_client.messages
end