Class: WaterDrop::Clients::Buffered

Inherits:
Dummy
  • Object
show all
Defined in:
lib/waterdrop/clients/buffered.rb

Overview

Client used to buffer messages that we send out in specs and other places.

Defined Under Namespace

Classes: SyncResponse

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Dummy

#method_missing, #respond_to_missing?, #wait

Constructor Details

#initialize(*args) ⇒ Buffered

Returns a new instance of Buffered.

Parameters:

  • args (Object)

    anything accepted by ‘Clients::Dummy`



18
19
20
21
22
23
24
25
26
27
28
# File 'lib/waterdrop/clients/buffered.rb', line 18

def initialize(*args)
  super
  @messages = []
  @topics = Hash.new { |k, v| k[v] = [] }

  @transaction_mutex = Mutex.new
  @transaction_active = false
  @transaction_messages = []
  @transaction_topics = Hash.new { |k, v| k[v] = [] }
  @transaction_level = 0
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class WaterDrop::Clients::Dummy

Instance Attribute Details

#messagesObject

Returns the value of attribute messages.



7
8
9
# File 'lib/waterdrop/clients/buffered.rb', line 7

def messages
  @messages
end

Instance Method Details

#messages_for(topic) ⇒ Object

Returns messages produced to a given topic

Parameters:

  • topic (String)


91
92
93
# File 'lib/waterdrop/clients/buffered.rb', line 91

def messages_for(topic)
  @topics[topic]
end

#produce(message) ⇒ Object

“Produces” message to Kafka: it acknowledges it locally, adds it to the internal buffer

Parameters:

  • message (Hash)

    ‘WaterDrop::Producer#produce_sync` message hash



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/waterdrop/clients/buffered.rb', line 32

def produce(message)
  if @transaction_active
    @transaction_topics[message.fetch(:topic)] << message
    @transaction_messages << message
  else
    # We pre-validate the message payload, so topic is ensured to be present
    @topics[message.fetch(:topic)] << message
    @messages << message
  end

  SyncResponse.new
end

#resetObject

Clears internal buffer Used in between specs so messages do not leak out



97
98
99
100
# File 'lib/waterdrop/clients/buffered.rb', line 97

def reset
  @messages.clear
  @topics.each_value(&:clear)
end

#transactionObject

Yields the code pretending it is in a transaction Supports our aborting transaction flow Moves messages the appropriate buffers only if transaction is successful



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/waterdrop/clients/buffered.rb', line 48

def transaction
  @transaction_level += 1

  return yield if @transaction_mutex.owned?

  @transaction_mutex.lock
  @transaction_active = true

  result = nil
  commit = false

  catch(:abort) do
    result = yield
    commit = true
  end

  commit || raise(WaterDrop::Errors::AbortTransaction)

  # Transfer transactional data on success
  @transaction_topics.each do |topic, messages|
    @topics[topic] += messages
  end

  @messages += @transaction_messages

  result
rescue StandardError => e
  return if e.is_a?(WaterDrop::Errors::AbortTransaction)

  raise
ensure
  @transaction_level -= 1

  if @transaction_level.zero? && @transaction_mutex.owned?
    @transaction_topics.clear
    @transaction_messages.clear
    @transaction_active = false
    @transaction_mutex.unlock
  end
end