Module: Karafka::Pro::Routing::Features::ConsumerGroups::PeriodicJob::Topic
- Defined in:
- lib/karafka/pro/routing/features/consumer_groups/periodic_job/topic.rb
Overview
Periodic topic action flows extensions
Instance Method Summary collapse
-
#initialize ⇒ Object
This method sets up the extra instance variable to nil before calling the parent class initializer.
-
#periodic_job(active = false, interval: nil, during_pause: nil, during_retry: nil) ⇒ Object
(also: #periodic)
Defines topic as periodic.
-
#periodic_job? ⇒ Boolean
Is periodics active.
-
#to_h ⇒ Hash
Topic with all its native configuration options plus periodics flows settings.
Instance Method Details
#initialize ⇒ Object
This method sets up the extra instance variable to nil before calling the parent class initializer. The explicit initialization to nil is included as an optimization for Ruby’s object shapes system, which improves memory layout and access performance.
43 44 45 46 |
# File 'lib/karafka/pro/routing/features/consumer_groups/periodic_job/topic.rb', line 43 def initialize(...) @periodic_job = nil super end |
#periodic_job(active = false, interval: nil, during_pause: nil, during_retry: nil) ⇒ Object Also known as: periodic
Defines topic as periodic. Periodic topics consumers will invoke ‘#tick` with each poll where messages were not received.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/karafka/pro/routing/features/consumer_groups/periodic_job/topic.rb', line 58 def periodic_job( active = false, interval: nil, during_pause: nil, during_retry: nil ) @periodic_job ||= begin # Set to active if any of the values was configured active = true unless interval.nil? active = true unless during_pause.nil? active = true unless during_retry.nil? # Default is not to retry during retry flow during_retry = false if during_retry.nil? # If no interval, use default interval ||= Karafka::App.config.internal.tick_interval Config.new( active: active, interval: interval, during_pause: during_pause, during_retry: during_retry, # This is internal setting for state management, not part of the configuration # Do not overwrite. # If `during_pause` is explicit, we do not select it based on LRJ setup and we # consider if fully ready out of the box materialized: !during_pause.nil? ) end return @periodic_job if @periodic_job.materialized? return @periodic_job unless @long_running_job # If not configured in any way, we want not to process during pause for LRJ. # LRJ pauses by default when processing and during this time we do not want to # tick at all. This prevents us from running periodic jobs while LRJ jobs are # running. This of course has a side effect of not running when paused for any # other reason but it is a compromise in the default settings @periodic_job.during_pause = !long_running_job? @periodic_job.materialized = true @periodic_job end |
#periodic_job? ⇒ Boolean
Returns is periodics active.
105 106 107 |
# File 'lib/karafka/pro/routing/features/consumer_groups/periodic_job/topic.rb', line 105 def periodic_job? periodic_job.active? end |
#to_h ⇒ Hash
Returns topic with all its native configuration options plus periodics flows settings.
111 112 113 114 115 |
# File 'lib/karafka/pro/routing/features/consumer_groups/periodic_job/topic.rb', line 111 def to_h super.merge( periodic_job: periodic_job.to_h ).freeze end |