Class: Karafka::Pro::ScheduledMessages::Consumer

Inherits:
BaseConsumer
  • Object
show all
Defined in:
lib/karafka/pro/scheduled_messages/consumer.rb

Overview

Consumer that coordinates scheduling of messages when the time comes

Instance Attribute Summary

Attributes inherited from BaseConsumer

#client, #coordinator, #id, #messages, #producer

Instance Method Summary collapse

Methods inherited from BaseConsumer

#initialize, #inspect, #on_after_consume, #on_before_consume, #on_before_schedule_consume, #on_before_schedule_eofed, #on_before_schedule_idle, #on_before_schedule_revoked, #on_before_schedule_shutdown, #on_consume, #on_eofed, #on_idle, #on_initialized, #on_revoked, #on_shutdown, #on_wrap

Constructor Details

This class inherits a constructor from Karafka::BaseConsumer

Instance Method Details

#consumeObject

Processes messages and runs dispatch (via tick) if needed



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/karafka/pro/scheduled_messages/consumer.rb', line 60

def consume
  return if reload!

  messages.each do |message|
    SchemaValidator.call(message)

    # We always track offsets of messages, even if they would be later on skipped or
    # ignored for any reason. That way we have debug info that is useful once in a while.
    @tracker.offsets(message)

    process_message(message)
  end

  @states_reporter.call

  recent_timestamp = messages.last.timestamp.to_i
  post_started_timestamp = @tracker.started_at + GRACE_PERIOD

  # If we started getting messages that are beyond the current time, it means we have
  # loaded enough to start scheduling. The upcoming messages are from the future looking
  # from perspective of the current consumer start. We add a bit of grace period not to
  # deal with edge cases
  loaded! if @state.loading? && recent_timestamp > post_started_timestamp

  eofed if eofed?

  # Unless given day data is fully loaded we should not dispatch any notifications nor
  # should we mark messages.
  return unless @state.loaded?

  tick

  # Despite the fact that we need to load the whole stream once a day we do mark.
  # We mark as consumed for two main reasons:
  #   - by marking we can indicate to Web UI and other monitoring tools that we have a
  #     potential real lag with loading schedules in case there would be a lot of messages
  #     added to the schedules topic
  #   - we prevent a situation where there is no notion of this consumer group in the
  #     reporting, allowing us to establish "presence"
  mark_as_consumed(messages.last)
end

#eofedObject

Runs end of file operations



103
104
105
106
107
108
# File 'lib/karafka/pro/scheduled_messages/consumer.rb', line 103

def eofed
  return if reload!

  # If end of the partition is reached, it always means all data is loaded
  loaded!
end

#initializedObject

Prepares the initial state of all stateful components



50
51
52
53
54
55
56
57
# File 'lib/karafka/pro/scheduled_messages/consumer.rb', line 50

def initialized
  clear!
  # Max epoch is always moving forward with the time. Never backwards, hence we do not
  # reset it at all.
  @max_epoch = MaxEpoch.new
  @state = State.new
  @reloads = 0
end

#shutdownObject

Move the state to shutdown and publish immediately



139
140
141
142
# File 'lib/karafka/pro/scheduled_messages/consumer.rb', line 139

def shutdown
  @state.stopped!
  @states_reporter.call!
end

#tickObject

Performs periodic operations when no new data is provided to the topic partition



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/karafka/pro/scheduled_messages/consumer.rb', line 111

def tick
  return if reload!

  # We should not dispatch any data until the whole state is loaded. We need to make sure,
  # that all tombstone events are loaded not to duplicate dispatches
  return unless @state.loaded?

  keys = []

  # We first collect all the data for dispatch and then dispatch and **only** after
  # dispatch that is sync is successful we remove those messages from the daily buffer
  # and update the max epoch. Since only the dispatch itself is volatile and can crash
  # with timeouts, etc, we need to be sure it wen through prior to deleting those messages
  # from the daily buffer. That way we ensure the at least once delivery and in case of
  # a transactional producer, exactly once delivery.
  @daily_buffer.for_dispatch do |message|
    keys << message.key
    @dispatcher << message
  end

  @dispatcher.flush

  keys.each { |key| @daily_buffer.delete(key) }

  @states_reporter.call
end