Module: Karafka::Pro::ScheduledMessages::Proxy

Defined in:
lib/karafka/pro/scheduled_messages/proxy.rb

Overview

Proxy used to wrap the scheduled messages with the correct dispatch envelope. Each message that goes to the scheduler topic needs to have specific headers and other details that are required by the system so we know how and when to dispatch it.

Each message that goes to the proxy topic needs to have a unique key. We inject those automatically unless user provides one in an envelope. Since we want to make sure, that the messages dispatched by the user all go to the same partition (if with same key), we inject a partition_key based on the user key or other details if present. That allows us to make sure, that they will always go to the same partition on our side.

This wrapper validates the initial message that user wants to send in the future, as well as the envelope and specific requirements for a message to be send in the future

Class Method Summary collapse

Class Method Details

.cancel(key:, envelope: {}) ⇒ Hash

Note:

Technically it is a tombstone but we differentiate just for the sake of ability to debug stuff if needed

Generates a tombstone message to cancel already scheduled message dispatch

Parameters:

  • key (String)

    key used by the original message as a unique identifier

  • envelope (Hash) (defaults to: {})

    Special details that can identify the message location like topic and partition (if used) so the cancellation goes to the correct location.

Returns:

  • (Hash)

    cancellation message



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/karafka/pro/scheduled_messages/proxy.rb', line 124

def cancel(key:, envelope: {})
  proxy_message = {
    key: key,
    payload: nil,
    headers: {
      "schedule_schema_version" => ScheduledMessages::SCHEMA_VERSION,
      "schedule_source_type" => "cancel"
    }
  }.merge(envelope)

  # Ensure user provided envelope is with all expected details
  validate!(proxy_message)

  proxy_message
end

.schedule(message:, epoch:, envelope: {}) ⇒ Hash

Note:

This proxy does not inject the dispatched messages topic unless provided in the envelope. That’s because user can have multiple scheduled messages topics to group outgoing messages, etc.

Generates a schedule message envelope wrapping the original dispatch

Parameters:

  • message (Hash)

    message hash of a message that would originally go to WaterDrop producer directly.

  • epoch (Integer)

    time in the future (or now) when dispatch this message in the Unix epoch timestamp

  • envelope (Hash) (defaults to: {})

    Special details that the envelop needs to have, like a unique key. If unique key is not provided we build a random unique one and use a partition_key based on the original message key (if present) to ensure that all relevant messages are dispatched to the same topic partition.

Returns:

  • (Hash)

    dispatched message wrapped with an envelope



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/karafka/pro/scheduled_messages/proxy.rb', line 84

def schedule(message:, epoch:, envelope: {})
  # We need to ensure that the message we want to proxy is fully legit. Otherwise, since
  # we envelope details like target topic, we could end up having incorrect data to
  # schedule
  MSG_CONTRACT.validate!(
    message,
    WaterDrop::Errors::MessageInvalidError,
    scope: %w[scheduled_messages message]
  )

  headers = (message[:headers] || {}).merge(
    "schedule_schema_version" => ScheduledMessages::SCHEMA_VERSION,
    "schedule_target_epoch" => epoch.to_i.to_s,
    "schedule_source_type" => "schedule"
  )

  export(headers, message, :topic)
  export(headers, message, :partition)
  export(headers, message, :key)
  export(headers, message, :partition_key)

  proxy_message = {
    payload: message[:payload],
    headers: headers
  }.merge(envelope)

  enrich(proxy_message, message)
  validate!(proxy_message)

  proxy_message
end

.tombstone(message:) ⇒ Object

Builds tombstone with the dispatched message details. Those details can be used

in Web UI, etc when analyzing dispatches.

Parameters:

  • message (Karafka::Messages::Message)

    message we want to tombstone topic and partition (if used) so the cancellation goes to the correct location.



144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/karafka/pro/scheduled_messages/proxy.rb', line 144

def tombstone(message:)
  {
    key: message.key,
    payload: nil,
    topic: message.topic,
    partition: message.partition,
    headers: message.raw_headers.merge(
      "schedule_schema_version" => ScheduledMessages::SCHEMA_VERSION,
      "schedule_source_type" => "tombstone",
      "schedule_source_offset" => message.offset.to_s
    )
  }
end