Class: Karafka::Pro::ScheduledMessages::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/scheduled_messages/dispatcher.rb

Overview

Dispatcher responsible for dispatching the messages to appropriate target topics and for dispatching other messages. All messages (aside from the once users dispatch with the envelope) are sent via this dispatcher.

Messages are buffered and dispatched in batches to improve dispatch performance.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, partition) ⇒ Dispatcher

Returns a new instance of Dispatcher.

Parameters:

  • topic (String)

    consumed topic name

  • partition (Integer)

    consumed partition



45
46
47
48
49
50
# File 'lib/karafka/pro/scheduled_messages/dispatcher.rb', line 45

def initialize(topic, partition)
  @topic = topic
  @partition = partition
  @buffer = []
  @serializer = Serializer.new
end

Instance Attribute Details

#bufferArray<Hash> (readonly)

Returns buffer with message hashes for dispatch.

Returns:

  • (Array<Hash>)

    buffer with message hashes for dispatch



41
42
43
# File 'lib/karafka/pro/scheduled_messages/dispatcher.rb', line 41

def buffer
  @buffer
end

Instance Method Details

#<<(message) ⇒ Object

Note:

This method adds the message to the buffer, does not dispatch it.

Note:

It also produces needed tombstone event as well as an audit log message

Prepares the scheduled message to the dispatch to the target topic. Extracts all the “schedule_” details and prepares it, so the dispatched message goes with the expected attributes to the desired location. Alongside of that it actually builds 2 (1 if logs off) messages: tombstone event matching the schedule so it is no longer valid and the log message that has the same data as the dispatched message. Helpful when debugging.

Parameters:



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/karafka/pro/scheduled_messages/dispatcher.rb', line 63

def <<(message)
  target_headers = message.raw_headers.merge(
    "schedule_source_topic" => @topic,
    "schedule_source_partition" => @partition.to_s,
    "schedule_source_offset" => message.offset.to_s,
    "schedule_source_key" => message.key
  ).compact

  target = {
    payload: message.raw_payload,
    headers: target_headers
  }

  extract(target, message.headers, :topic)
  extract(target, message.headers, :partition)
  extract(target, message.headers, :key)
  extract(target, message.headers, :partition_key)

  @buffer << target

  # Tombstone message so this schedule is no longer in use and gets removed from Kafka by
  # Kafka itself during compacting. It will not cancel it because already dispatched but
  # will cause it not to be sent again and will be marked as dispatched.
  @buffer << Proxy.tombstone(message: message)
end

#flushObject

Sends all messages to Kafka in a sync way. We use sync with batches to prevent overloading. When transactional producer in use, this will be wrapped in a transaction automatically.



108
109
110
111
112
113
114
115
116
# File 'lib/karafka/pro/scheduled_messages/dispatcher.rb', line 108

def flush
  until @buffer.empty?
    config.producer.produce_many_sync(
      # We can remove this prior to the dispatch because we only evict messages from the
      # daily buffer once dispatch is successful
      @buffer.shift(config.flush_batch_size)
    )
  end
end

#state(tracker) ⇒ Object

Note:

This is dispatched async because it’s just a statistical metric.

Builds and dispatches the state report message with schedules details

Parameters:



94
95
96
97
98
99
100
101
102
103
# File 'lib/karafka/pro/scheduled_messages/dispatcher.rb', line 94

def state(tracker)
  config.producer.produce_async(
    topic: "#{@topic}#{config.states_postfix}",
    payload: @serializer.state(tracker),
    # We use the state as a key, so we always have one state transition data available
    key: "#{tracker.state}_state",
    partition: @partition,
    headers: { "zlib" => "true" }
  )
end