Class: Rage::Deferred::Backends::Disk
- Inherits:
-
Object
- Object
- Rage::Deferred::Backends::Disk
- Defined in:
- lib/rage/deferred/backends/disk.rb
Overview
Rage::Deferred::Backends implements a storage layer to persist deferred tasks.
A storage should implement the following instance methods:
add- called when a task has to be added to the storage;remove- called when a task has to be removed from the storage;pending_tasks- the method should iterate over the underlying storage and return a list of tasks to replay;
Constant Summary collapse
- STORAGE_VERSION =
"0"- STORAGE_SIZE_INCREASE_RATIO =
1.5- DEFAULT_PUBLISH_AT =
"0"- DEFAULT_STORAGE_SIZE_LIMIT =
2_000_000
Instance Method Summary collapse
-
#add(task, publish_at: nil, task_id: nil) ⇒ String
Add a record to the log representing a new task.
-
#initialize(path:, prefix:, fsync_frequency:) ⇒ Disk
constructor
A new instance of Disk.
-
#pending_tasks ⇒ Array<(String, Rage::Deferred::Task, Integer, Integer)>
Return a list of pending tasks in the storage.
-
#remove(task_id) ⇒ Object
Add a record to the log representing a task removal.
Constructor Details
#initialize(path:, prefix:, fsync_frequency:) ⇒ Disk
Returns a new instance of Disk.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/rage/deferred/backends/disk.rb', line 20 def initialize(path:, prefix:, fsync_frequency:) @storage_path = path @storage_prefix = "#{prefix}#{STORAGE_VERSION}" @fsync_frequency = fsync_frequency @storage_path.mkpath # try to open and take ownership of all storage files in the storage directory storage_files = @storage_path.glob("#{@storage_prefix}-*").filter_map do |file_path| file = file_path.open("a+b") if file.flock(File::LOCK_EX | File::LOCK_NB) sleep 0.01 # reduce contention between workers file else file.close end end # if there are no storage files - create one; # otherwise the first one is used as the main storage; the rest will be merged into the main storage if storage_files.empty? @storage = create_storage else @storage = storage_files[0] @recovered_storages = storage_files[1..] if storage_files.length > 1 end # include recovered storages from crashed/previous workers all_storages = [@storage, *@recovered_storages].compact # find the highest task timestamp across all storage files = all_storages.map do |storage| = 0 storage.tap(&:rewind).each_line(chomp: true) do |entry| next unless entry[9...12] == "add" = entry[13..].split("-").first.to_i = if > end end.max.to_i # apply Lamport IR2(b) From time, clocks and the ordering of # events in a distributed system to guard against clock skew task_id_seed = [Time.now.to_i, ].max + 1 @task_id_base, @task_id_i = "#{task_id_seed}-#{Process.pid}", 0 Iodine.run_every(1_000) do task_id_seed += 1 @task_id_base, @task_id_i = "#{task_id_seed}-#{Process.pid}", 0 end @storage_size_limit = DEFAULT_STORAGE_SIZE_LIMIT @storage_size = @storage.size @fsync_scheduled = false @should_rotate = false # we use different counters for different tasks: # delayed tasks are stored in the hash; for regular tasks we only maintain a counter; # this information is only used during storage rotation @immediate_tasks_in_queue = 0 @delayed_tasks = {} # ensure data is written to disk @storage_has_changes = false Iodine.run_every(@fsync_frequency) do if @storage_has_changes @storage_has_changes = false @storage.fsync end end end |
Instance Method Details
#add(task, publish_at: nil, task_id: nil) ⇒ String
Add a record to the log representing a new task.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/rage/deferred/backends/disk.rb', line 97 def add(task, publish_at: nil, task_id: nil) serialized_task = Marshal.dump(task).dump persisted_task_id = task_id || generate_task_id entry = build_add_entry(persisted_task_id, serialized_task, publish_at) write_to_storage(entry) if publish_at @delayed_tasks[persisted_task_id] = [serialized_task, publish_at] else @immediate_tasks_in_queue += 1 end persisted_task_id end |
#pending_tasks ⇒ Array<(String, Rage::Deferred::Task, Integer, Integer)>
Return a list of pending tasks in the storage.
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/rage/deferred/backends/disk.rb', line 131 def pending_tasks if @recovered_storages # `@recovered_storages` will only be present if the server has previously crashed and left # some storage files behind, or if the new cluster is started with fewer workers than before; # TLDR: this code is expected to execute very rarely @recovered_storages.each { |storage| recover_tasks(storage.tap(&:rewind)) } end tasks = {} corrupted_tasks_count = 0 # find pending tasks in the storage @storage.tap(&:rewind).each_line(chomp: true) do |entry| signature, op, payload = entry[0...8], entry[9...12], entry[9..] next if signature&.empty? || payload&.empty? || op&.empty? unless signature == Zlib.crc32(payload).to_s(16).rjust(8, "0") corrupted_tasks_count += 1 next end if op == "add" task_id = entry[13...entry.index(":", 13).to_i] tasks[task_id] = entry elsif op == "rem" task_id = entry[13..] tasks.delete(task_id) end end if corrupted_tasks_count != 0 puts "WARNING: Detected #{corrupted_tasks_count} corrupted deferred task(s)" end tasks.filter_map do |task_id, entry| _, _, _, serialized_publish_at, serialized_task = entry.split(":", 5) task = Marshal.load(serialized_task.undump) publish_at = (serialized_publish_at == DEFAULT_PUBLISH_AT ? nil : serialized_publish_at.to_i) if publish_at @delayed_tasks[task_id] = [serialized_task, publish_at] else @immediate_tasks_in_queue += 1 end [task_id, task, publish_at] rescue ArgumentError, NameError => e puts "ERROR: Can't deserialize the task with id #{task_id}: (#{e.class}) #{e.}" nil end end |
#remove(task_id) ⇒ Object
Add a record to the log representing a task removal.
116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/rage/deferred/backends/disk.rb', line 116 def remove(task_id) write_to_storage(build_remove_entry(task_id)) if @delayed_tasks.has_key?(task_id) @delayed_tasks.delete(task_id) else @immediate_tasks_in_queue -= 1 end # rotate the storage once the size is over the limit and all non-delayed tasks are processed rotate_storage if @should_rotate && @immediate_tasks_in_queue == 0 end |