Module: WaterDrop::Producer::Transactions

Included in:
WaterDrop::Producer
Defined in:
lib/waterdrop/producer/transactions.rb

Overview

Transactions related producer functionalities

Instance Method Summary collapse

Instance Method Details

#transactionObject

Creates a transaction.

Karafka transactions work in a similar manner to SQL db transactions though there are some crucial differences. When you start a transaction, all messages produced during it will be delivered together or will fail together. The difference is, that messages from within a single transaction can be delivered and will have a delivery handle but will be then compacted prior to moving the LSO forward. This means, that not every delivery handle for async dispatches will emit a queue purge error. None for sync as the delivery has happened but they will never be visible by the transactional consumers.

Transactions are thread-safe however they lock a mutex. This means, that for high-throughput transactional messages production in multiple threads (for example in Karafka), it may be much better to use few instances that can work in parallel.

Please note, that if a producer is configured as transactional, it cannot produce messages outside of transactions, that is why by default all dispatches will be wrapped with a transaction. One transaction per single dispatch and for ‘produce_many` it will be a single transaction wrapping all messages dispatches (not one per message).

Examples:

Simple transaction

producer.transaction do
  producer.produce_async(topic: 'topic', payload: 'data')
end

Aborted transaction - messages producer won’t be visible by consumers

producer.transaction do
  producer.produce_sync(topic: 'topic', payload: 'data')
  throw(:abort)
end

Use block result last handler to wait on all messages ack

handler = producer.transaction do
            producer.produce_async(topic: 'topic', payload: 'data')
          end

handler.wait

Returns:

  • Block result



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/waterdrop/producer/transactions.rb', line 46

def transaction
  # This will safely allow us to support one operation transactions so a transactional
  # producer can work without the transactional block if needed
  return yield if @transaction_mutex.owned?

  @transaction_mutex.synchronize do
    transactional_instrument(:committed) do
      with_transactional_error_handling(:begin) do
        transactional_instrument(:started) { client.begin_transaction }
      end

      result = nil
      commit = false

      catch(:abort) do
        result = yield
        commit = true
      end

      commit || raise(WaterDrop::Errors::AbortTransaction)

      with_transactional_error_handling(:commit) do
        client.commit_transaction
      end

      result
    rescue StandardError => e
      with_transactional_error_handling(:abort) do
        transactional_instrument(:aborted) { client.abort_transaction }
      end

      raise unless e.is_a?(WaterDrop::Errors::AbortTransaction)
    end
  end
end

#transactional?Boolean

Returns Is this producer a transactional one.

Returns:

  • (Boolean)

    Is this producer a transactional one



83
84
85
86
87
# File 'lib/waterdrop/producer/transactions.rb', line 83

def transactional?
  return @transactional if instance_variable_defined?(:'@transactional')

  @transactional = config.kafka.to_h.key?(:'transactional.id')
end