Class: Rimless::Consumer::Base

Inherits:
Karafka::BaseConsumer
  • Object
show all
Defined in:
lib/rimless/consumer/base.rb

Overview

The base consumer where all Apache Kafka messages will be processed, within an ActiveJob job. It comes with some simple conventions to keep the actual application code simple to use. Example usage on an application:

app/consumers/my_consumer.rb

class IdentityApiConsumer < ApplicationConsumer
  # Handle +identity-api.users/user_locked+ messages.
  #
  # @param user [Hash{Symbol => Mixed}] the event user data
  # @param args [Hash{Symbol => Mixed}] additional event data
  def user_locked(user:, **args)
    # ..
  end
end

Despite its default usage within an ActiveJob context, it still directly usable by Karafka. Just be warned that, when running inside a ActiveJob context, it lack support for various Karafka internals (eg. coordinator, client, etc).

Direct Known Subclasses

ApplicationConsumer

Defined Under Namespace

Classes: JobContextCoordinator

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#messageObject Also known as: params

Allow to handle a single message, each at a time



28
29
30
# File 'lib/rimless/consumer/base.rb', line 28

def message
  @message
end

Class Method Details

.build_for_job(payload:, metadata:) ⇒ Rimless::Consumer::Base

Build a new disposable consumer instance for a single Apache Kafka message, which should be processed within the ActiveJob context.

Parameters:

  • payload (Mixed)

    the (already) decoded Kafka message payload

  • metadata (Hash)

    the Kafka message metadata (string/symbol keys are allowed)

Returns:



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/rimless/consumer/base.rb', line 50

def self.build_for_job(payload:, metadata:)
  new.tap do |consumer|
     = .symbolize_keys

    consumer.coordinator = JobContextCoordinator.new(
      topic: [:topic],
      partition: [:partition]
    )
    consumer.producer = Rimless.producer

     = Karafka::Messages::Metadata.new(
      **.except(:key, :headers),
      raw_key: [:key],
      raw_headers: [:headers],
      deserializers: job_deserializers
    )
    consumer.messages =
      [Karafka::Messages::Message.new(payload, )]
  end
end

.job_deserializersKarafka::Routing::Features::Deserializers::Config

A custom set of Karafka deserializers, exclusive for the ActiveJob context. As we already get the deserialized details (payload, message key, message headers), we just want to pass the values through.

Returns:

  • (Karafka::Routing::Features::Deserializers::Config)

    the deserializers config object



77
78
79
80
81
82
83
84
85
# File 'lib/rimless/consumer/base.rb', line 77

def self.job_deserializers
  @job_deserializers ||=
    Karafka::Routing::Features::Deserializers::Config.new(
      active: true,
      payload: ->(message) { message.raw_payload },
      key: Karafka::Deserializers::Key.new,
      headers: Karafka::Deserializers::Headers.new
    )
end

Instance Method Details

#argumentsHash{Symbol => Mixed}

Prepare the message payload as event method arguments.

Returns:

  • (Hash{Symbol => Mixed})

    the event method arguments



102
103
104
105
106
# File 'lib/rimless/consumer/base.rb', line 102

def arguments
  event_name_key = :event
  event_name_key = 'event' if message.payload.key? 'event'
  message.payload.except(event_name_key)
end

#consumeObject

A generic message consuming handler which resolves the message event name to an actual method. All message data (top-level keys) is passed down to the event method as symbol arguments.



90
91
92
93
94
95
96
97
# File 'lib/rimless/consumer/base.rb', line 90

def consume
  messages.each do |message|
    self.message = message

    # We ignore events we do not handle by definition
    send(event, **arguments) if !event.nil? && respond_to?(event)
  end
end

#eventSymbol

A shortcut to fetch the event name from the Kafka message.

Returns:

  • (Symbol)

    the event name of the current message



111
112
113
114
115
# File 'lib/rimless/consumer/base.rb', line 111

def event
  event_name = message.payload[:event]
  event_name ||= message.payload['event']
  event_name&.to_sym
end