Module: WaterDrop::Producer::Transactions
- Included in:
- WaterDrop::Producer
- Defined in:
- lib/waterdrop/producer/transactions.rb
Overview
Transactions related producer functionalities
Instance Method Summary collapse
-
#transaction ⇒ Object
Creates a transaction.
-
#transaction_mark_as_consumed(consumer, message, offset_metadata = nil) ⇒ Object
Marks given message as consumed inside of a transaction.
-
#transactional? ⇒ Boolean
Is this producer a transactional one.
Instance Method Details
#transaction ⇒ Object
Creates a transaction.
Karafka transactions work in a similar manner to SQL db transactions though there are some crucial differences. When you start a transaction, all messages produced during it will be delivered together or will fail together. The difference is, that messages from within a single transaction can be delivered and will have a delivery handle but will be then compacted prior to moving the LSO forward. This means, that not every delivery handle for async dispatches will emit a queue purge error. None for sync as the delivery has happened but they will never be visible by the transactional consumers.
Transactions are thread-safe however they lock a mutex. This means, that for high-throughput transactional messages production in multiple threads (for example in Karafka), it may be much better to use few instances that can work in parallel.
Please note, that if a producer is configured as transactional, it cannot produce messages outside of transactions, that is why by default all dispatches will be wrapped with a transaction. One transaction per single dispatch and for ‘produce_many` it will be a single transaction wrapping all messages dispatches (not one per message).
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/waterdrop/producer/transactions.rb', line 51 def transaction # This will safely allow us to support one operation transactions so a transactional # producer can work without the transactional block if needed return yield if @transaction_mutex.owned? @transaction_mutex.synchronize do transactional_instrument(:finished) do with_transactional_error_handling(:begin) do transactional_instrument(:started) { client.begin_transaction } end result = nil commit = false catch(:abort) do result = yield commit = true end commit || raise(WaterDrop::Errors::AbortTransaction) with_transactional_error_handling(:commit) do transactional_instrument(:committed) { client.commit_transaction } end result # We need to handle any interrupt including critical in order not to have the transaction # running. This will also handle things like `IRB::Abort` # # rubocop:disable Lint/RescueException rescue Exception => e # rubocop:enable Lint/RescueException with_transactional_error_handling(:abort) do transactional_instrument(:aborted) { client.abort_transaction } end raise unless e.is_a?(WaterDrop::Errors::AbortTransaction) end end end |
#transaction_mark_as_consumed(consumer, message, offset_metadata = nil) ⇒ Object
Marks given message as consumed inside of a transaction.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/waterdrop/producer/transactions.rb', line 105 def transaction_mark_as_consumed(consumer, , = nil) raise Errors::TransactionRequiredError unless @transaction_mutex.owned? CONTRACT.validate!( { consumer: consumer, message: , offset_metadata: }, Errors::TransactionalOffsetInvalidError ) details = { message: , offset_metadata: } transactional_instrument(:marked_as_consumed, details) do tpl = Rdkafka::Consumer::TopicPartitionList.new partition = Rdkafka::Consumer::Partition.new( .partition, # +1 because this is next offset from which we will start processing from .offset + 1, 0, ) tpl.add_topic_and_partitions_with_offsets(.topic, [partition]) with_transactional_error_handling(:store_offset) do client.send_offsets_to_transaction( consumer, tpl, @config.max_wait_timeout ) end end end |
#transactional? ⇒ Boolean
Returns Is this producer a transactional one.
93 94 95 96 97 |
# File 'lib/waterdrop/producer/transactions.rb', line 93 def transactional? return @transactional if instance_variable_defined?(:'@transactional') @transactional = config.kafka.to_h.key?(:'transactional.id') end |