Class: Rimless::Consumer::Base
- Inherits:
-
Karafka::BaseConsumer
- Object
- Karafka::BaseConsumer
- Rimless::Consumer::Base
- Defined in:
- lib/rimless/consumer/base.rb
Overview
The base consumer where all Apache Kafka messages will be processed, within an ActiveJob job. It comes with some simple conventions to keep the actual application code simple to use. Example usage on an application:
app/consumers/my_consumer.rb
class IdentityApiConsumer < ApplicationConsumer
# Handle +identity-api.users/user_locked+ messages.
#
# @param user [Hash{Symbol => Mixed}] the event user data
# @param args [Hash{Symbol => Mixed}] additional event data
def user_locked(user:, **args)
# ..
end
end
Despite its default usage within an ActiveJob context, it still directly usable by Karafka. Just be warned that, when running inside a ActiveJob context, it lack support for various Karafka internals (eg. coordinator, client, etc).
Direct Known Subclasses
Defined Under Namespace
Classes: JobContextCoordinator
Instance Attribute Summary collapse
-
#message ⇒ Object
(also: #params)
Allow to handle a single message, each at a time.
Class Method Summary collapse
-
.build_for_job(payload:, metadata:) ⇒ Rimless::Consumer::Base
Build a new disposable consumer instance for a single Apache Kafka message, which should be processed within the ActiveJob context.
-
.job_deserializers ⇒ Karafka::Routing::Features::Deserializers::Config
A custom set of Karafka deserializers, exclusive for the ActiveJob context.
Instance Method Summary collapse
-
#arguments ⇒ Hash{Symbol => Mixed}
Prepare the message payload as event method arguments.
-
#consume ⇒ Object
A generic message consuming handler which resolves the message event name to an actual method.
-
#event ⇒ Symbol
A shortcut to fetch the event name from the Kafka message.
Instance Attribute Details
#message ⇒ Object Also known as: params
Allow to handle a single message, each at a time
28 29 30 |
# File 'lib/rimless/consumer/base.rb', line 28 def @message end |
Class Method Details
.build_for_job(payload:, metadata:) ⇒ Rimless::Consumer::Base
Build a new disposable consumer instance for a single Apache Kafka message, which should be processed within the ActiveJob context.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/rimless/consumer/base.rb', line 50 def self.build_for_job(payload:, metadata:) new.tap do |consumer| = .symbolize_keys consumer.coordinator = JobContextCoordinator.new( topic: [:topic], partition: [:partition] ) consumer.producer = Rimless.producer = Karafka::Messages::Metadata.new( **.except(:key, :headers), raw_key: [:key], raw_headers: [:headers], deserializers: job_deserializers ) consumer. = [Karafka::Messages::Message.new(payload, )] end end |
.job_deserializers ⇒ Karafka::Routing::Features::Deserializers::Config
A custom set of Karafka deserializers, exclusive for the ActiveJob context. As we already get the deserialized details (payload, message key, message headers), we just want to pass the values through.
77 78 79 80 81 82 83 84 85 |
# File 'lib/rimless/consumer/base.rb', line 77 def self.job_deserializers @job_deserializers ||= Karafka::Routing::Features::Deserializers::Config.new( active: true, payload: ->() { .raw_payload }, key: Karafka::Deserializers::Key.new, headers: Karafka::Deserializers::Headers.new ) end |
Instance Method Details
#arguments ⇒ Hash{Symbol => Mixed}
Prepare the message payload as event method arguments.
102 103 104 105 106 |
# File 'lib/rimless/consumer/base.rb', line 102 def arguments event_name_key = :event event_name_key = 'event' if .payload.key? 'event' .payload.except(event_name_key) end |
#consume ⇒ Object
A generic message consuming handler which resolves the message event name to an actual method. All message data (top-level keys) is passed down to the event method as symbol arguments.
90 91 92 93 94 95 96 97 |
# File 'lib/rimless/consumer/base.rb', line 90 def consume .each do || self. = # We ignore events we do not handle by definition send(event, **arguments) if !event.nil? && respond_to?(event) end end |
#event ⇒ Symbol
A shortcut to fetch the event name from the Kafka message.
111 112 113 114 115 |
# File 'lib/rimless/consumer/base.rb', line 111 def event event_name = .payload[:event] event_name ||= .payload['event'] event_name&.to_sym end |