Class: Karafka::Pro::ScheduledMessages::Consumer
- Inherits:
-
BaseConsumer
- Object
- BaseConsumer
- Karafka::Pro::ScheduledMessages::Consumer
- Defined in:
- lib/karafka/pro/scheduled_messages/consumer.rb
Overview
Consumer that coordinates scheduling of messages when the time comes
Instance Attribute Summary
Attributes inherited from BaseConsumer
#client, #coordinator, #id, #messages, #producer
Instance Method Summary collapse
-
#consume ⇒ Object
Processes messages and runs dispatch (via tick) if needed.
-
#eofed ⇒ Object
Runs end of file operations.
-
#initialized ⇒ Object
Prepares the initial state of all stateful components.
-
#shutdown ⇒ Object
Move the state to shutdown and publish immediately.
-
#tick ⇒ Object
Performs periodic operations when no new data is provided to the topic partition.
Methods inherited from BaseConsumer
#initialize, #inspect, #on_after_consume, #on_before_consume, #on_before_schedule_consume, #on_before_schedule_eofed, #on_before_schedule_idle, #on_before_schedule_revoked, #on_before_schedule_shutdown, #on_consume, #on_eofed, #on_idle, #on_initialized, #on_revoked, #on_shutdown, #on_wrap
Constructor Details
This class inherits a constructor from Karafka::BaseConsumer
Instance Method Details
#consume ⇒ Object
Processes messages and runs dispatch (via tick) if needed
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/karafka/pro/scheduled_messages/consumer.rb', line 60 def consume return if reload! .each do || SchemaValidator.call() # We always track offsets of messages, even if they would be later on skipped or # ignored for any reason. That way we have debug info that is useful once in a while. @tracker.offsets() () end @states_reporter.call = .last..to_i = @tracker.started_at + GRACE_PERIOD # If we started getting messages that are beyond the current time, it means we have # loaded enough to start scheduling. The upcoming messages are from the future looking # from perspective of the current consumer start. We add a bit of grace period not to # deal with edge cases loaded! if @state.loading? && > eofed if eofed? # Unless given day data is fully loaded we should not dispatch any notifications nor # should we mark messages. return unless @state.loaded? tick # Despite the fact that we need to load the whole stream once a day we do mark. # We mark as consumed for two main reasons: # - by marking we can indicate to Web UI and other monitoring tools that we have a # potential real lag with loading schedules in case there would be a lot of messages # added to the schedules topic # - we prevent a situation where there is no notion of this consumer group in the # reporting, allowing us to establish "presence" mark_as_consumed(.last) end |
#eofed ⇒ Object
Runs end of file operations
103 104 105 106 107 108 |
# File 'lib/karafka/pro/scheduled_messages/consumer.rb', line 103 def eofed return if reload! # If end of the partition is reached, it always means all data is loaded loaded! end |
#initialized ⇒ Object
Prepares the initial state of all stateful components
50 51 52 53 54 55 56 57 |
# File 'lib/karafka/pro/scheduled_messages/consumer.rb', line 50 def initialized clear! # Max epoch is always moving forward with the time. Never backwards, hence we do not # reset it at all. @max_epoch = MaxEpoch.new @state = State.new @reloads = 0 end |
#shutdown ⇒ Object
Move the state to shutdown and publish immediately
139 140 141 142 |
# File 'lib/karafka/pro/scheduled_messages/consumer.rb', line 139 def shutdown @state.stopped! @states_reporter.call! end |
#tick ⇒ Object
Performs periodic operations when no new data is provided to the topic partition
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/karafka/pro/scheduled_messages/consumer.rb', line 111 def tick return if reload! # We should not dispatch any data until the whole state is loaded. We need to make sure, # that all tombstone events are loaded not to duplicate dispatches return unless @state.loaded? keys = [] # We first collect all the data for dispatch and then dispatch and **only** after # dispatch that is sync is successful we remove those messages from the daily buffer # and update the max epoch. Since only the dispatch itself is volatile and can crash # with timeouts, etc, we need to be sure it wen through prior to deleting those messages # from the daily buffer. That way we ensure the at least once delivery and in case of # a transactional producer, exactly once delivery. @daily_buffer.for_dispatch do || keys << .key @dispatcher << end @dispatcher.flush keys.each { |key| @daily_buffer.delete(key) } @states_reporter.call end |