Module: WaterDrop::Producer::Transactions
- Included in:
- WaterDrop::Producer
- Defined in:
- lib/waterdrop/producer/transactions.rb
Overview
Transactions related producer functionalities
Instance Method Summary collapse
-
#transaction ⇒ Object
Creates a transaction.
-
#transactional? ⇒ Boolean
Is this producer a transactional one.
Instance Method Details
#transaction ⇒ Object
Creates a transaction.
Karafka transactions work in a similar manner to SQL db transactions though there are some crucial differences. When you start a transaction, all messages produced during it will be delivered together or will fail together. The difference is, that messages from within a single transaction can be delivered and will have a delivery handle but will be then compacted prior to moving the LSO forward. This means, that not every delivery handle for async dispatches will emit a queue purge error. None for sync as the delivery has happened but they will never be visible by the transactional consumers.
Transactions are thread-safe however they lock a mutex. This means, that for high-throughput transactional messages production in multiple threads (for example in Karafka), it may be much better to use few instances that can work in parallel.
Please note, that if a producer is configured as transactional, it cannot produce messages outside of transactions, that is why by default all dispatches will be wrapped with a transaction. One transaction per single dispatch and for ‘produce_many` it will be a single transaction wrapping all messages dispatches (not one per message).
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/waterdrop/producer/transactions.rb', line 46 def transaction # This will safely allow us to support one operation transactions so a transactional # producer can work without the transactional block if needed return yield if @transaction_mutex.owned? @transaction_mutex.synchronize do transactional_instrument(:committed) do with_transactional_error_handling(:begin) do transactional_instrument(:started) { client.begin_transaction } end result = nil commit = false catch(:abort) do result = yield commit = true end commit || raise(WaterDrop::Errors::AbortTransaction) with_transactional_error_handling(:commit) do client.commit_transaction end result rescue StandardError => e with_transactional_error_handling(:abort) do transactional_instrument(:aborted) { client.abort_transaction } end raise unless e.is_a?(WaterDrop::Errors::AbortTransaction) end end end |
#transactional? ⇒ Boolean
Returns Is this producer a transactional one.
83 84 85 86 87 |
# File 'lib/waterdrop/producer/transactions.rb', line 83 def transactional? return @transactional if instance_variable_defined?(:'@transactional') @transactional = config.kafka.to_h.key?(:'transactional.id') end |