Class: WaterDrop::Clients::Buffered

Inherits:
Dummy
  • Object
show all
Defined in:
lib/waterdrop/clients/buffered.rb

Overview

Client used to buffer messages that we send out in specs and other places.

Defined Under Namespace

Classes: SyncResponse

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Dummy

#method_missing, #respond_to_missing?, #wait

Constructor Details

#initialize(*args) ⇒ Buffered

Returns a new instance of Buffered.

Parameters:

  • args (Object)

    anything accepted by ‘Clients::Dummy`



18
19
20
21
22
# File 'lib/waterdrop/clients/buffered.rb', line 18

def initialize(*args)
  super
  @messages = []
  @topics = Hash.new { |k, v| k[v] = [] }
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class WaterDrop::Clients::Dummy

Instance Attribute Details

#messagesObject

Returns the value of attribute messages.



7
8
9
# File 'lib/waterdrop/clients/buffered.rb', line 7

def messages
  @messages
end

Instance Method Details

#messages_for(topic) ⇒ Object

Returns messages produced to a given topic

Parameters:

  • topic (String)


35
36
37
# File 'lib/waterdrop/clients/buffered.rb', line 35

def messages_for(topic)
  @topics[topic]
end

#produce(message) ⇒ Object

“Produces” message to Kafka: it acknowledges it locally, adds it to the internal buffer

Parameters:

  • message (Hash)

    ‘WaterDrop::Producer#produce_sync` message hash



26
27
28
29
30
31
# File 'lib/waterdrop/clients/buffered.rb', line 26

def produce(message)
  # We pre-validate the message payload, so topic is ensured to be present
  @topics[message.fetch(:topic)] << message
  @messages << message
  SyncResponse.new
end

#resetObject

Clears internal buffer Used in between specs so messages do not leak out



41
42
43
44
# File 'lib/waterdrop/clients/buffered.rb', line 41

def reset
  @messages.clear
  @topics.each_value(&:clear)
end