Class: Karafka::Pro::RecurringTasks::Executor
- Inherits:
-
Object
- Object
- Karafka::Pro::RecurringTasks::Executor
- Defined in:
- lib/karafka/pro/recurring_tasks/executor.rb
Overview
Executor is responsible for management of the state of recurring tasks schedule and is the heart of recurring tasks. It coordinates the replaying process as well as tracking of data on changes.
Constant Summary collapse
- COMMANDS =
Task commands we support and that can be triggered on tasks (if matched)
%w[ disable enable trigger ].freeze
Instance Method Summary collapse
-
#apply_command(command_hash) ⇒ Object
Applies given command to task (or many tasks) by running the command on tasks that match.
-
#call ⇒ Object
Run all tasks that should run at a given time and if any tasks were changed in any way or executed, stores the most recent state back to Kafka.
-
#incompatible? ⇒ Boolean
Is the current process schedule incompatible (older) than the one that we have in memory.
-
#initialize ⇒ Executor
constructor
Initializes the executor in replaying mode.
-
#replay ⇒ Object
Once all previous data is accumulated runs the catchup process to establish current state of the recurring tasks schedule execution.
-
#replaying? ⇒ Boolean
Are we in the replaying phase or not (false means, regular operations).
-
#update_state(schedule_hash) ⇒ Object
Updates the catchup state.
Constructor Details
Instance Method Details
#apply_command(command_hash) ⇒ Object
Applies given command to task (or many tasks) by running the command on tasks that match
67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 67 def apply_command(command_hash) cmd_name = command_hash[:command][:name] raise(Karafka::Errors::UnsupportedCaseError, cmd_name) unless COMMANDS.include?(cmd_name) schedule.each do |task| next unless @matcher.matches?(task, command_hash) task.public_send(cmd_name) end end |
#call ⇒ Object
Run all tasks that should run at a given time and if any tasks were changed in any way or executed, stores the most recent state back to Kafka
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 137 def call changed = false schedule.each do |task| changed = true if task.changed? unless task.call? task.clear next end changed = true task.call end snapshot if changed end |
#incompatible? ⇒ Boolean
Returns Is the current process schedule incompatible (older) than the one that we have in memory.
61 62 63 |
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 61 def incompatible? @incompatible end |
#replay ⇒ Object
Once all previous data is accumulated runs the catchup process to establish current state of the recurring tasks schedule execution.
It includes applying any requested commands as well as synchronizing execution details for existing schedule and making sure all is loaded correctly.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 90 def replay # Ensure replaying happens only once return unless @replaying @replaying = false # When there is nothing to replay and synchronize, we can just save the state and # proceed if @catchup_commands.empty? && @catchup_schedule.nil? snapshot return end # If the schedule version we have in Kafka is higher than ours, we cannot proceed # This prevents us from applying older changes to a new schedule if @catchup_schedule[:schedule_version] > schedule.version @incompatible = true return end # Now we can synchronize the in-memory state based on the last state stored in Kafka schedule.each do |task| stored_task = @catchup_schedule[:tasks][task.id.to_sym] next unless stored_task stored_previous_time = stored_task[:previous_time] task.previous_time = stored_previous_time.zero? ? 0 : Time.at(stored_previous_time) stored_task[:enabled] ? task.enable : task.disable end @catchup_commands.each do |cmd| apply_command(cmd) end # We make sure to save in Kafka once more once everything is up to date snapshot @catchup_schedule = nil @catchup_commands = [] end |
#replaying? ⇒ Boolean
Returns are we in the replaying phase or not (false means, regular operations).
55 56 57 |
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 55 def @replaying end |
#update_state(schedule_hash) ⇒ Object
Updates the catchup state
81 82 83 |
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 81 def update_state(schedule_hash) @catchup_schedule = schedule_hash end |