Module: Karafka::Pro::ScheduledMessages

Defined in:
lib/karafka/pro/scheduled_messages.rb,
lib/karafka/pro/scheduled_messages/day.rb,
lib/karafka/pro/scheduled_messages/proxy.rb,
lib/karafka/pro/scheduled_messages/state.rb,
lib/karafka/pro/scheduled_messages/errors.rb,
lib/karafka/pro/scheduled_messages/tracker.rb,
lib/karafka/pro/scheduled_messages/consumer.rb,
lib/karafka/pro/scheduled_messages/max_epoch.rb,
lib/karafka/pro/scheduled_messages/dispatcher.rb,
lib/karafka/pro/scheduled_messages/serializer.rb,
lib/karafka/pro/scheduled_messages/daily_buffer.rb,
lib/karafka/pro/scheduled_messages/setup/config.rb,
lib/karafka/pro/scheduled_messages/contracts/config.rb,
lib/karafka/pro/scheduled_messages/schema_validator.rb,
lib/karafka/pro/scheduled_messages/contracts/message.rb,
lib/karafka/pro/scheduled_messages/deserializers/headers.rb,
lib/karafka/pro/scheduled_messages/deserializers/payload.rb

Overview

This feature allows for proxying messages via a special topic that can dispatch them at a later time, hence scheduled messages. Such messages need to have a special format but aside from that they are regular Kafka messages.

This work was conceptually inspired by the Go scheduler: github.com/etf1/kafka-message-scheduler though I did not look at the implementation itself. Just the concept of daily in-memory scheduling.

Defined Under Namespace

Modules: Contracts, Deserializers, Errors, Proxy, SchemaValidator, Setup Classes: Consumer, DailyBuffer, Day, Dispatcher, MaxEpoch, Serializer, State, Tracker

Constant Summary collapse

SCHEMA_VERSION =

Version of the schema we use for envelops in scheduled messages. We use it to detect any potential upgrades similar to other components of Karafka and to stop processing of incompatible versions

"1.0.0"
STATES_SCHEMA_VERSION =

Version of the states schema. Used to publish per partition simple aggregated metrics that can be used for schedules reporting

"1.0.0"

Class Method Summary collapse

Class Method Details

.cancelHash

Generates a tombstone message to cancel given dispatch (if not yet happened)

Returns:

  • (Hash)

    tombstone cancelling message



59
60
61
# File 'lib/karafka/pro/scheduled_messages.rb', line 59

def cancel(**)
  Proxy.cancel(**)
end

.post_fork(config, pre_fork_producer) ⇒ Object

Basically since we may have custom producers configured that are not the same as the default one, we hold a reference to old pre-fork producer. This means, that when we initialize it again in post-fork, as long as user uses defaults we should re-inherit it from the default config.

Parameters:

  • config (Karafka::Core::Configurable::Node)
  • pre_fork_producer (WaterDrop::Producer)


90
91
92
93
94
# File 'lib/karafka/pro/scheduled_messages.rb', line 90

def post_fork(config, pre_fork_producer)
  return unless config.scheduled_messages.producer == pre_fork_producer

  config.scheduled_messages.producer = config.producer
end

.post_setup(config) ⇒ Object

Parameters:

  • config (Karafka::Core::Configurable::Node)

    root node config



76
77
78
79
80
81
# File 'lib/karafka/pro/scheduled_messages.rb', line 76

def post_setup(config)
  ScheduledMessages::Contracts::Config.new.validate!(
    config.to_h,
    scope: %w[config]
  )
end

.pre_setup(config) ⇒ Object

Sets up additional config scope, validations and other things

Parameters:

  • config (Karafka::Core::Configurable::Node)

    root node config



68
69
70
71
72
73
# File 'lib/karafka/pro/scheduled_messages.rb', line 68

def pre_setup(config)
  # Expand the config with this feature specific stuff
  config.instance_eval do
    setting(:scheduled_messages, default: Setup::Config.config)
  end
end

.scheduleHash

Runs the ‘Proxy.call`

Returns:

  • (Hash)

    message wrapped with the scheduled message envelope



53
54
55
# File 'lib/karafka/pro/scheduled_messages.rb', line 53

def schedule(**)
  Proxy.schedule(**)
end