Class: Rimless::Consumer::JobBridge

Inherits:
Karafka::BaseConsumer
  • Object
show all
Defined in:
lib/rimless/consumer/job_bridge.rb

Overview

This is our default consumer for Karafka, which wraps the actual user consumer classes, consumes all message of a batch and enqueues them as ActiveJob job. It builds a bridge between the Karafka topic/message consumer process and a later running ActiveJob processor (eg. Sidekiq or Solid Queue).

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.build(consumer) ⇒ Class

Build a new anonymous wrapper class, based on the given destination consumer class.

Parameters:

  • consumer (Class)

    the consumer to pass down to the jobs

Returns:

  • (Class)

    the new and configured wrapper class

Raises:

  • (ArgumentError)


20
21
22
23
24
25
26
27
28
29
30
# File 'lib/rimless/consumer/job_bridge.rb', line 20

def build(consumer)
  # We cannot serialize anonymous classes, as they need to cross
  # process borders via ActiveJob here, and the resulting job needs to
  # constantize the serialized class name again
  raise ArgumentError, "Anonymous consumer class passed: #{consumer}" \
    unless consumer.name

  Class.new(self).tap do |wrapper|
    wrapper.consumer = consumer.name
  end
end

.inspectString

A custom object/class inspection helper to allow pretty printing of the anonymous class.

Returns:

  • (String)

    the pretty-printed class/instance



36
37
38
39
40
41
42
# File 'lib/rimless/consumer/job_bridge.rb', line 36

def inspect
  # When not an anonymous class
  return super unless name.nil?

  # Otherwise the anonymous wrapper class
  "#{Rimless::Consumer::JobBridge.name}[consumer=#{consumer.inspect}]"
end

.to_sString

A custom object/class inspection helper to allow pretty printing of the anonymous class.

Returns:

  • (String)

    the pretty-printed class/instance



43
44
45
46
47
48
49
# File 'lib/rimless/consumer/job_bridge.rb', line 43

def inspect
  # When not an anonymous class
  return super unless name.nil?

  # Otherwise the anonymous wrapper class
  "#{Rimless::Consumer::JobBridge.name}[consumer=#{consumer.inspect}]"
end

Instance Method Details

#consumeObject

Consume all messages of the current batch, and mark each message afterwards as processed (asynchronous). You can simply overwrite this method if you need more precise control of the message processing, eg. just using marking the whole batch processed, or custom error handling.

This method provides at-least-once delivery semantics. Each message is enqueued to ActiveJob and then marked as consumed via mark_as_consumed (asynchronous commit). In the unlikely event that the Karafka process crashes after perform_later succeeds but before the offset is committed to the broker (e.g. OOM kill, hardware failure), the message will be re-delivered and enqueued again on restart. Downstream consumers should therefore be idempotent. For stronger guarantees, use mark_as_consumed! (synchronous commit) at the cost of throughput, by providing a custom job_bridge_class via Rimless.configuration.

See: bit.ly/4aPXaai - then configure your own ‘Rimless.configuration.job_bridge_class`.



65
66
67
68
69
70
# File 'lib/rimless/consumer/job_bridge.rb', line 65

def consume
  messages.each do |message|
    enqueue_job(message)
    mark_as_consumed(message)
  end
end

#enqueue_job(message) ⇒ Object

Enqueue a new job for the given message.

Parameters:

  • message (Karafka::Messages::Message)

    the message to enqueue



75
76
77
78
79
# File 'lib/rimless/consumer/job_bridge.rb', line 75

def enqueue_job(message)
  Rimless.configuration.consumer_job_class.perform_later(
    **message_to_job_args(message)
  )
end

#inspectString Also known as: to_s

A custom object/class inspection helper to allow pretty printing of the anonymous class.

Returns:

  • (String)

    the pretty-printed class/instance



107
108
109
# File 'lib/rimless/consumer/job_bridge.rb', line 107

def inspect
  "#<#{Rimless::Consumer::JobBridge.name} consumer=#{consumer.inspect}>"
end

#message_to_job_args(message) ⇒ Hash{Symbol => Mixed}

Convert the given Karafka::Messages::Message instance to a simple hash, which can be transported by ActiveJob.

Parameters:

  • message (Karafka::Messages::Message)

    the message to enqueue

Returns:

  • (Hash{Symbol => Mixed})

    the job argument



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/rimless/consumer/job_bridge.rb', line 86

def message_to_job_args(message)
  {
    payload: message.payload,
    consumer:,
    metadata: message..to_h.slice(
      :topic,
      :partition,
      :offset,
      :timestamp,
      :received_at
    ).merge(
      key: message..key,
      headers: message..headers
    )
  }
end