Class: Karafka::Pro::RecurringTasks::Serializer
- Inherits:
-
Object
- Object
- Karafka::Pro::RecurringTasks::Serializer
- Defined in:
- lib/karafka/pro/recurring_tasks/serializer.rb
Overview
Converts schedule command and log details into data we can dispatch to Kafka.
Constant Summary collapse
- SCHEMA_VERSION =
Current recurring tasks related schema structure
"1.0"
Instance Method Summary collapse
-
#command(command_name, task_id) ⇒ String
Serialized and compressed command data.
-
#log(event) ⇒ String
Serialized and compressed event log data.
-
#schedule(schedule) ⇒ String
Serializes and compresses the schedule with all its tasks and their execution state.
Instance Method Details
#command(command_name, task_id) ⇒ String
Returns serialized and compressed command data.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/karafka/pro/recurring_tasks/serializer.rb', line 72 def command(command_name, task_id) data = { schema_version: SCHEMA_VERSION, schedule_version: Karafka::Pro::RecurringTasks.schedule.version, dispatched_at: Time.now.to_f, type: "command", command: { name: command_name }, task: { id: task_id } } compress( serialize(data) ) end |
#log(event) ⇒ String
Returns serialized and compressed event log data.
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/karafka/pro/recurring_tasks/serializer.rb', line 93 def log(event) task = event[:task] data = { schema_version: SCHEMA_VERSION, schedule_version: Karafka::Pro::RecurringTasks.schedule.version, dispatched_at: Time.now.to_f, type: "log", task: { id: task.id, time_taken: event.payload[:time] || -1, result: event.payload.key?(:error) ? "failure" : "success" } } compress( serialize(data) ) end |
#schedule(schedule) ⇒ String
Serializes and compresses the schedule with all its tasks and their execution state
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/karafka/pro/recurring_tasks/serializer.rb', line 43 def schedule(schedule) tasks = {} schedule.each do |task| tasks[task.id] = { id: task.id, cron: task.cron.original, previous_time: task.previous_time.to_i, next_time: task.next_time.to_i, enabled: task.enabled? } end data = { schema_version: SCHEMA_VERSION, schedule_version: schedule.version, dispatched_at: Time.now.to_f, type: "schedule", tasks: tasks } compress( serialize(data) ) end |