Module: Deimos::Consume::BatchConsumption
- Extended by:
- ActiveSupport::Concern
- Includes:
- Phobos::BatchHandler
- Included in:
- Deimos::Consumer
- Defined in:
- lib/deimos/consume/batch_consumption.rb
Overview
Helper methods used by batch consumers, i.e. those with “inline_batch” delivery. Payloads are decoded then consumers are invoked with arrays of messages to be handled at once
Instance Method Summary collapse
-
#around_consume_batch(batch, metadata) ⇒ Object
:nodoc:.
-
#consume_batch(_payloads, _metadata) ⇒ Object
Consume a batch of incoming messages.
Instance Method Details
#around_consume_batch(batch, metadata) ⇒ Object
:nodoc:
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/deimos/consume/batch_consumption.rb', line 13 def around_consume_batch(batch, ) payloads = [] benchmark = Benchmark.measure do if self.class.config[:key_configured] [:keys] = batch.map do || decode_key(.key) end end [:first_offset] = batch.first&.offset payloads = batch.map do || (.payload) end _received_batch(payloads, ) _with_span do yield(payloads, ) end end _handle_batch_success(benchmark.real, payloads, ) rescue StandardError => e _handle_batch_error(e, payloads, ) end |
#consume_batch(_payloads, _metadata) ⇒ Object
Consume a batch of incoming messages.
39 40 41 |
# File 'lib/deimos/consume/batch_consumption.rb', line 39 def consume_batch(_payloads, ) raise NotImplementedError end |