Module: Julewire::Karafka::MessageContext

Defined in:
lib/julewire/karafka/message_context.rb

Class Method Summary collapse

Class Method Details

.call(message, configuration:, fields: nil) ⇒ Object

Raises:

  • (ArgumentError)


7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/julewire/karafka/message_context.rb', line 7

def call(message, configuration:, fields: nil, &)
  raise ArgumentError, "block required" unless block_given?

  fields ||= PayloadReader.message_payload(message)
  carrier = carrier_for(fields, configuration)

  result = Julewire::Core::Propagation::Carrier.extract_result(
    carrier,
    key: configuration.carrier_key,
    max_bytes: configuration.carrier_max_bytes
  )
  record_carrier_restore_failure(result)

  Julewire::Core::Propagation.restore(result.envelope, owned: true) do
    Julewire::Core::Integration::Facade.with_neutral(message_neutral(fields)) do
      Julewire::Core::Integration::Facade.with_attributes(message_attributes(fields), &)
    end
  end
end