Class: Karafka::BaseConsumer

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Core::Taggable
Defined in:
lib/karafka/base_consumer.rb

Overview

Base consumer from which all Karafka consumers should inherit

Direct Known Subclasses

ActiveJob::Consumer

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBaseConsumer

Creates new consumer and assigns it an id



29
30
31
32
# File 'lib/karafka/base_consumer.rb', line 29

def initialize
  @id = SecureRandom.hex(6)
  @used = false
end

Instance Attribute Details

#clientKarafka::Connection::Client

Returns kafka connection client.

Returns:



22
23
24
# File 'lib/karafka/base_consumer.rb', line 22

def client
  @client
end

#coordinatorKarafka::Processing::Coordinator

Returns coordinator.

Returns:



24
25
26
# File 'lib/karafka/base_consumer.rb', line 24

def coordinator
  @coordinator
end

#idString (readonly)

Returns id of the current consumer.

Returns:

  • (String)

    id of the current consumer



18
19
20
# File 'lib/karafka/base_consumer.rb', line 18

def id
  @id
end

#messagesKarafka::Routing::Topic

Returns topic to which a given consumer is subscribed.

Returns:



20
21
22
# File 'lib/karafka/base_consumer.rb', line 20

def messages
  @messages
end

#producerWaterdrop::Producer

Returns producer instance.

Returns:

  • (Waterdrop::Producer)

    producer instance



26
27
28
# File 'lib/karafka/base_consumer.rb', line 26

def producer
  @producer
end

Instance Method Details

#on_after_consumeObject

Note:

This should not be used by the end users as it is part of the lifecycle of things but not as part of the public api.

Note:

We handle and report errors here because of flows that could fail. For example a DLQ flow could fail if it was not able to dispatch the DLQ message. Other “non-user” based flows do not interact with external systems and their errors are expected to bubble up



89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/karafka/base_consumer.rb', line 89

def on_after_consume
  handle_after_consume
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    seek_offset: coordinator.seek_offset,
    type: 'consumer.after_consume.error'
  )

  retry_after_pause
end

#on_before_consumeObject

Note:

This should not be used by the end users as it is part of the lifecycle of things and not as part of the public api. This can act as a hook when creating non-blocking consumers and doing other advanced stuff

Can be used to run preparation code in the worker



51
52
53
54
55
56
57
58
# File 'lib/karafka/base_consumer.rb', line 51

def on_before_consume
  messages..processed_at = Time.now
  messages..freeze

  # We run this after the full metadata setup, so we can use all the messages information
  # if needed
  handle_before_consume
end

#on_before_schedule_consumeObject

Note:

This should not be used by the end users as it is part of the lifecycle of things and not as a part of the public api. This should not perform any extensive operations as it is blocking and running in the listener thread.

Can be used to run preparation code prior to the job being enqueued



40
41
42
43
# File 'lib/karafka/base_consumer.rb', line 40

def on_before_schedule_consume
  @used = true
  handle_before_schedule_consume
end

#on_before_schedule_idleObject

Can be used to run code prior to scheduling of idle execution



106
107
108
# File 'lib/karafka/base_consumer.rb', line 106

def on_before_schedule_idle
  handle_before_schedule_idle
end

#on_before_schedule_revokedObject

Can be used to run code prior to scheduling of revoked execution



120
121
122
# File 'lib/karafka/base_consumer.rb', line 120

def on_before_schedule_revoked
  handle_before_schedule_revoked
end

#on_before_schedule_shutdownObject

Can be used to run code prior to scheduling of revoked execution



141
142
143
# File 'lib/karafka/base_consumer.rb', line 141

def on_before_schedule_shutdown
  handle_before_schedule_shutdown
end

#on_consumeBoolean

Note:

We keep the seek offset tracking, and use it to compensate for async offset flushing that may not yet kick in when error occurs. That way we pause always on the last processed message.

Executes the default consumer flow.

Returns:

  • (Boolean)

    true if there was no exception, otherwise false.



69
70
71
72
73
74
75
76
77
78
79
# File 'lib/karafka/base_consumer.rb', line 69

def on_consume
  handle_consume
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    seek_offset: coordinator.seek_offset,
    type: 'consumer.consume.error'
  )
end

#on_idleObject

Trigger method for running on idle runs without messages



113
114
115
# File 'lib/karafka/base_consumer.rb', line 113

def on_idle
  handle_idle
end

#on_revokedObject

Trigger method for running on partition revocation.



127
128
129
130
131
132
133
134
135
136
# File 'lib/karafka/base_consumer.rb', line 127

def on_revoked
  handle_revoked
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.revoked.error'
  )
end

#on_shutdownObject

Trigger method for running on shutdown.



148
149
150
151
152
153
154
155
156
157
# File 'lib/karafka/base_consumer.rb', line 148

def on_shutdown
  handle_shutdown
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.shutdown.error'
  )
end