Class: Rimless::Consumer::App

Inherits:
Karafka::App
  • Object
show all
Defined in:
lib/rimless/consumer/app.rb

Overview

The consumer application which adds some convenience helpers and library-related configurations.

Instance Method Summary collapse

Constructor Details

#initializeRimless::Consumer::App

Creates a new Rimless/Karafka application instance while configuring our library defaults.

rubocop:disable Metrics/MethodLength – because of the Karafka

configuration


18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/rimless/consumer/app.rb', line 18

def initialize
  # Run the parent class initialization
  super

  setup do |config|
    # See: https://bit.ly/3OtIfeu (+config.kafka+ settings)

    # An optional identifier of a Kafka consumer (in a consumer group)
    # that is passed to a Kafka broker with every request. A logical
    # application name to be included in Kafka logs and monitoring
    # aggregates.
    config.kafka[:'client.id'] = Rimless.configuration.client_id
    # All the known brokers, at least one. The ruby-kafka driver will
    # discover the whole cluster structure once and when issues occur
    # to dynamically adjust scaling operations.
    config.kafka[:'bootstrap.servers'] =
      Rimless.configuration.kafka_brokers
    # All brokers MUST acknowledge a new message by default
    config.kafka[:'request.required.acks'] = -1

    # See: https://bit.ly/3MAF6Jk (+config.*+ settings)

    # Used to uniquely identify given client instance - for logging only
    config.client_id = [
      Rimless.configuration.client_id,
      Process.pid,
      Socket.gethostname
    ].join('-')

    # Should be unique per application to properly track message
    # consumption. See: Kafka Consumer Groups.
    config.group_id = Rimless.configuration.client_id

    # We use dots (parts separation) and underscores for topic names, by
    # convention.
    config.strict_topics_namespacing = false

    # Number of milliseconds after which Karafka no longer waits for the
    # consumers to stop gracefully but instead we force terminate
    # everything.
    config.shutdown_timeout = 10.seconds.in_milliseconds

    # Recreate consumers with each batch. This will allow Rails code
    # reload to work in the development mode. Otherwise Karafka process
    # would not be aware of code changes.
    config.consumer_persistence = Rimless.env.production?

    # Use our logger instead
    config.logger = Rimless.logger
  end

  # Add the logging listener to Karafka in order to facilitate our gem
  # logger. When the user configuration results in an falsy value (eg.
  # +nil+ or +false+), we skip it.
  listener = Rimless.configuration.consumer_logger_listener
  Karafka.monitor.subscribe(listener) if listener

  # Configure some routing defaults
  routes.draw do
    defaults do
      deserializers(
        payload: Rimless.configuration.avro_deserializer_class.new
      )
    end
  end

  # We configure synchronous logging to stdout as supervising/monitoring
  # processes await logging outputs to detect the Karafka server process
  # is up and running properly
  $stdout.sync = true

  # Call the user-configurable block with our configuration
  # for customizations
  setup(&Rimless.configuration.consumer_configure)
end

Instance Method Details

#configure {|Karafka::Setup::ConfigProxy| ... } ⇒ Rimless::Consumer::App

Allows the user to re-configure the Karafka application if this is needed. (eg. to set some kafka driver settings, etc)

Yields:

  • (Karafka::Setup::ConfigProxy)

    the given block to allow configuration manipulation

Returns:



172
173
174
175
# File 'lib/rimless/consumer/app.rb', line 172

def configure(&)
  setup(&)
  self
end

#server?Boolean

Check if we run as the Karafka server (consumer) process or not. Unfortunately Karafka still does not offer a solution for it like Sidekiq.server?. (Last tested with Karafka version 2.5.7)

Returns:

  • (Boolean)

    whenever we run as the Karafka server or not



182
183
184
# File 'lib/rimless/consumer/app.rb', line 182

def server?
  $PROGRAM_NAME.end_with?('karafka') && ARGV.include?('server')
end

#topic_names(parts) ⇒ Array<String>

Build the conventional Apache Kafka topic names from the given parts. This allows various forms like single strings/symbols and a hash in the form of +{ app: [String, Symbol], name: [String, Symbol], names:

Array<String, Symbol>

}+. This allows the maximum of flexibility.

Parameters:

  • parts (String, Symbol, Hash{Symbol => Mixed})

    the topic name parts

Returns:

  • (Array<String>)

    the full topic names



154
155
156
157
158
159
160
161
162
163
164
# File 'lib/rimless/consumer/app.rb', line 154

def topic_names(parts)
  # We have a single app, but multiple names so we handle them
  if parts.is_a?(Hash) && parts.key?(:names)
    return parts[:names].map do |name|
      Rimless.topic(parts.merge(name: name))
    end
  end

  # We cannot handle the given input
  [Rimless.topic(parts)]
end

#topics(topics = []) { ... } ⇒ Rimless::Consumer::App

Configure the topics-consumer routing table in a lean way.

Examples:

topics({ app: :test_app, name: :admins } => YourConsumer)
topics({ app: :test_app, names: %i[users admins] } => YourConsumer)

Examples:

topics(
  { app: :test_app, name: :admins } => lambda { |topic|
    consumer Rimless::Consumer::JobBridge.build(dest_consumer)
  }
)

Examples:

topics do
  topic('name') do
    consumer CustomConsumer
  end
end

Parameters:

  • topics (Hash{Hash => Class, Proc}) (defaults to: [])

    the topic to consumer mapping

Yields:

  • the given block on the routing table

Returns:



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/rimless/consumer/app.rb', line 121

def topics(topics = [], &block)
  routes.draw do
    consumer_group(Rimless.configuration.client_id) do
      instance_exec(&block) if block_given?

      topics.each do |topic_parts, dest_consumer|
        Rimless.consumer.topic_names(topic_parts).each do |topic_name|
          configure = proc do
            consumer(
              Rimless.configuration.job_bridge_class.build(dest_consumer)
            )
            deserializers(
              payload: Rimless.configuration.avro_deserializer_class.new
            )
          end
          configure = dest_consumer if dest_consumer.is_a? Proc
          topic(topic_name, &configure)
        end
      end
    end
  end

  self
end