Module: Karafka::Pro::ScheduledMessages
- Defined in:
- lib/karafka/pro/scheduled_messages.rb,
lib/karafka/pro/scheduled_messages/day.rb,
lib/karafka/pro/scheduled_messages/proxy.rb,
lib/karafka/pro/scheduled_messages/state.rb,
lib/karafka/pro/scheduled_messages/errors.rb,
lib/karafka/pro/scheduled_messages/tracker.rb,
lib/karafka/pro/scheduled_messages/consumer.rb,
lib/karafka/pro/scheduled_messages/max_epoch.rb,
lib/karafka/pro/scheduled_messages/dispatcher.rb,
lib/karafka/pro/scheduled_messages/serializer.rb,
lib/karafka/pro/scheduled_messages/daily_buffer.rb,
lib/karafka/pro/scheduled_messages/setup/config.rb,
lib/karafka/pro/scheduled_messages/contracts/config.rb,
lib/karafka/pro/scheduled_messages/schema_validator.rb,
lib/karafka/pro/scheduled_messages/contracts/message.rb,
lib/karafka/pro/scheduled_messages/deserializers/headers.rb,
lib/karafka/pro/scheduled_messages/deserializers/payload.rb
Overview
This feature allows for proxying messages via a special topic that can dispatch them at a later time, hence scheduled messages. Such messages need to have a special format but aside from that they are regular Kafka messages.
This work was conceptually inspired by the Go scheduler: github.com/etf1/kafka-message-scheduler though I did not look at the implementation itself. Just the concept of daily in-memory scheduling.
Defined Under Namespace
Modules: Contracts, Deserializers, Errors, Proxy, SchemaValidator, Setup Classes: Consumer, DailyBuffer, Day, Dispatcher, MaxEpoch, Serializer, State, Tracker
Constant Summary collapse
- SCHEMA_VERSION =
Version of the schema we use for envelops in scheduled messages. We use it to detect any potential upgrades similar to other components of Karafka and to stop processing of incompatible versions
"1.0.0"- STATES_SCHEMA_VERSION =
Version of the states schema. Used to publish per partition simple aggregated metrics that can be used for schedules reporting
"1.0.0"
Class Method Summary collapse
-
.cancel ⇒ Hash
Generates a tombstone message to cancel given dispatch (if not yet happened).
-
.post_fork(config, pre_fork_producer) ⇒ Object
Basically since we may have custom producers configured that are not the same as the default one, we hold a reference to old pre-fork producer.
- .post_setup(config) ⇒ Object
-
.pre_setup(config) ⇒ Object
Sets up additional config scope, validations and other things.
-
.schedule ⇒ Hash
Runs the ‘Proxy.call`.
Class Method Details
.cancel ⇒ Hash
Generates a tombstone message to cancel given dispatch (if not yet happened)
59 60 61 |
# File 'lib/karafka/pro/scheduled_messages.rb', line 59 def cancel(**) Proxy.cancel(**) end |
.post_fork(config, pre_fork_producer) ⇒ Object
Basically since we may have custom producers configured that are not the same as the default one, we hold a reference to old pre-fork producer. This means, that when we initialize it again in post-fork, as long as user uses defaults we should re-inherit it from the default config.
90 91 92 93 94 |
# File 'lib/karafka/pro/scheduled_messages.rb', line 90 def post_fork(config, pre_fork_producer) return unless config..producer == pre_fork_producer config..producer = config.producer end |
.post_setup(config) ⇒ Object
76 77 78 79 80 81 |
# File 'lib/karafka/pro/scheduled_messages.rb', line 76 def post_setup(config) ScheduledMessages::Contracts::Config.new.validate!( config.to_h, scope: %w[config] ) end |
.pre_setup(config) ⇒ Object
Sets up additional config scope, validations and other things
68 69 70 71 72 73 |
# File 'lib/karafka/pro/scheduled_messages.rb', line 68 def pre_setup(config) # Expand the config with this feature specific stuff config.instance_eval do setting(:scheduled_messages, default: Setup::Config.config) end end |