Class: Rimless::Consumer::JobBridge
- Inherits:
-
Karafka::BaseConsumer
- Object
- Karafka::BaseConsumer
- Rimless::Consumer::JobBridge
- Defined in:
- lib/rimless/consumer/job_bridge.rb
Overview
This is our default consumer for Karafka, which wraps the actual user consumer classes, consumes all message of a batch and enqueues them as ActiveJob job. It builds a bridge between the Karafka topic/message consumer process and a later running ActiveJob processor (eg. Sidekiq or Solid Queue).
Class Method Summary collapse
-
.build(consumer) ⇒ Class
Build a new anonymous wrapper class, based on the given destination consumer class.
-
.inspect ⇒ String
A custom object/class inspection helper to allow pretty printing of the anonymous class.
-
.to_s ⇒ String
A custom object/class inspection helper to allow pretty printing of the anonymous class.
Instance Method Summary collapse
-
#consume ⇒ Object
Consume all messages of the current batch, and mark each message afterwards as processed (asynchronous).
-
#enqueue_job(message) ⇒ Object
Enqueue a new job for the given message.
-
#inspect ⇒ String
(also: #to_s)
A custom object/class inspection helper to allow pretty printing of the anonymous class.
-
#message_to_job_args(message) ⇒ Hash{Symbol => Mixed}
Convert the given
Karafka::Messages::Messageinstance to a simple hash, which can be transported by ActiveJob.
Class Method Details
.build(consumer) ⇒ Class
Build a new anonymous wrapper class, based on the given destination consumer class.
20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/rimless/consumer/job_bridge.rb', line 20 def build(consumer) # We cannot serialize anonymous classes, as they need to cross # process borders via ActiveJob here, and the resulting job needs to # constantize the serialized class name again raise ArgumentError, "Anonymous consumer class passed: #{consumer}" \ unless consumer.name Class.new(self).tap do |wrapper| wrapper.consumer = consumer.name end end |
.inspect ⇒ String
A custom object/class inspection helper to allow pretty printing of the anonymous class.
36 37 38 39 40 41 42 |
# File 'lib/rimless/consumer/job_bridge.rb', line 36 def inspect # When not an anonymous class return super unless name.nil? # Otherwise the anonymous wrapper class "#{Rimless::Consumer::JobBridge.name}[consumer=#{consumer.inspect}]" end |
.to_s ⇒ String
A custom object/class inspection helper to allow pretty printing of the anonymous class.
43 44 45 46 47 48 49 |
# File 'lib/rimless/consumer/job_bridge.rb', line 43 def inspect # When not an anonymous class return super unless name.nil? # Otherwise the anonymous wrapper class "#{Rimless::Consumer::JobBridge.name}[consumer=#{consumer.inspect}]" end |
Instance Method Details
#consume ⇒ Object
Consume all messages of the current batch, and mark each message afterwards as processed (asynchronous). You can simply overwrite this method if you need more precise control of the message processing, eg. just using marking the whole batch processed, or custom error handling.
This method provides at-least-once delivery semantics. Each message is enqueued to ActiveJob and then marked as consumed via mark_as_consumed (asynchronous commit). In the unlikely event that the Karafka process crashes after perform_later succeeds but before the offset is committed to the broker (e.g. OOM kill, hardware failure), the message will be re-delivered and enqueued again on restart. Downstream consumers should therefore be idempotent. For stronger guarantees, use mark_as_consumed! (synchronous commit) at the cost of throughput, by providing a custom job_bridge_class via Rimless.configuration.
See: bit.ly/4aPXaai - then configure your own ‘Rimless.configuration.job_bridge_class`.
65 66 67 68 69 70 |
# File 'lib/rimless/consumer/job_bridge.rb', line 65 def consume .each do || enqueue_job() mark_as_consumed() end end |
#enqueue_job(message) ⇒ Object
Enqueue a new job for the given message.
75 76 77 78 79 |
# File 'lib/rimless/consumer/job_bridge.rb', line 75 def enqueue_job() Rimless.configuration.consumer_job_class.perform_later( **() ) end |
#inspect ⇒ String Also known as: to_s
A custom object/class inspection helper to allow pretty printing of the anonymous class.
107 108 109 |
# File 'lib/rimless/consumer/job_bridge.rb', line 107 def inspect "#<#{Rimless::Consumer::JobBridge.name} consumer=#{consumer.inspect}>" end |
#message_to_job_args(message) ⇒ Hash{Symbol => Mixed}
Convert the given Karafka::Messages::Message instance to a simple hash, which can be transported by ActiveJob.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/rimless/consumer/job_bridge.rb', line 86 def () { payload: .payload, consumer:, metadata: ..to_h.slice( :topic, :partition, :offset, :timestamp, :received_at ).merge( key: ..key, headers: ..headers ) } end |