Class: Rage::Deferred::Backends::Disk

Inherits:
Object
  • Object
show all
Defined in:
lib/rage/deferred/backends/disk.rb

Overview

Rage::Deferred::Backends implements a storage layer to persist deferred tasks. A storage should implement the following instance methods:

  • add - called when a task has to be added to the storage;
  • remove - called when a task has to be removed from the storage;
  • pending_tasks - the method should iterate over the underlying storage and return a list of tasks to replay;

Constant Summary collapse

STORAGE_VERSION =
"0"
STORAGE_SIZE_INCREASE_RATIO =
1.5
DEFAULT_PUBLISH_AT =
"0"
DEFAULT_STORAGE_SIZE_LIMIT =
2_000_000

Instance Method Summary collapse

Constructor Details

#initialize(path:, prefix:, fsync_frequency:) ⇒ Disk

Returns a new instance of Disk.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/rage/deferred/backends/disk.rb', line 20

def initialize(path:, prefix:, fsync_frequency:)
  @storage_path = path
  @storage_prefix = "#{prefix}#{STORAGE_VERSION}"
  @fsync_frequency = fsync_frequency

  @storage_path.mkpath

  # try to open and take ownership of all storage files in the storage directory
  storage_files = @storage_path.glob("#{@storage_prefix}-*").filter_map do |file_path|
    file = file_path.open("a+b")
    if file.flock(File::LOCK_EX | File::LOCK_NB)
      sleep 0.01 # reduce contention between workers
      file
    else
      file.close
    end
  end

  # if there are no storage files - create one;
  # otherwise the first one is used as the main storage; the rest will be merged into the main storage
  if storage_files.empty?
    @storage = create_storage
  else
    @storage = storage_files[0]
    @recovered_storages = storage_files[1..] if storage_files.length > 1
  end

  # include recovered storages from crashed/previous workers
  all_storages = [@storage, *@recovered_storages].compact

  # find the highest task timestamp across all storage files
  storage_file_max_timestamp = all_storages.map do |storage|
    max_timestamp = 0
    storage.tap(&:rewind).each_line(chomp: true) do |entry|
      next unless entry[9...12] == "add"
      timestamp = entry[13..].split("-").first.to_i
      max_timestamp = timestamp if timestamp > max_timestamp
    end
    max_timestamp
  end.max.to_i

  # apply Lamport IR2(b) From time, clocks and the ordering of
  # events in a distributed system to guard against clock skew
  task_id_seed = [Time.now.to_i, storage_file_max_timestamp].max + 1

  @task_id_base, @task_id_i = "#{task_id_seed}-#{Process.pid}", 0
  Iodine.run_every(1_000) do
    task_id_seed += 1
    @task_id_base, @task_id_i = "#{task_id_seed}-#{Process.pid}", 0
  end

  @storage_size_limit = DEFAULT_STORAGE_SIZE_LIMIT
  @storage_size = @storage.size
  @fsync_scheduled = false
  @should_rotate = false

  # we use different counters for different tasks:
  # delayed tasks are stored in the hash; for regular tasks we only maintain a counter;
  # this information is only used during storage rotation
  @immediate_tasks_in_queue = 0
  @delayed_tasks = {}

  # ensure data is written to disk
  @storage_has_changes = false
  Iodine.run_every(@fsync_frequency) do
    if @storage_has_changes
      @storage_has_changes = false
      @storage.fsync
    end
  end
end

Instance Method Details

#add(task, publish_at: nil, task_id: nil) ⇒ String

Add a record to the log representing a new task.

Parameters:

  • task (Rage::Deferred::Task)
  • publish_at (Integer, nil) (defaults to: nil)
  • task_id (String, nil) (defaults to: nil)

Returns:

  • (String)


97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/rage/deferred/backends/disk.rb', line 97

def add(task, publish_at: nil, task_id: nil)
  serialized_task = Marshal.dump(task).dump

  persisted_task_id = task_id || generate_task_id

  entry = build_add_entry(persisted_task_id, serialized_task, publish_at)
  write_to_storage(entry)

  if publish_at
    @delayed_tasks[persisted_task_id] = [serialized_task, publish_at]
  else
    @immediate_tasks_in_queue += 1
  end

  persisted_task_id
end

#pending_tasksArray<(String, Rage::Deferred::Task, Integer, Integer)>

Return a list of pending tasks in the storage.

Returns:

  • (Array<(String, Rage::Deferred::Task, Integer, Integer)>)

    Array<(String, Rage::Deferred::Task, Integer, Integer)>



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/rage/deferred/backends/disk.rb', line 131

def pending_tasks
  if @recovered_storages
    # `@recovered_storages` will only be present if the server has previously crashed and left
    # some storage files behind, or if the new cluster is started with fewer workers than before;
    # TLDR: this code is expected to execute very rarely
    @recovered_storages.each { |storage| recover_tasks(storage.tap(&:rewind)) }
  end

  tasks = {}
  corrupted_tasks_count = 0

  # find pending tasks in the storage
  @storage.tap(&:rewind).each_line(chomp: true) do |entry|
    signature, op, payload = entry[0...8], entry[9...12], entry[9..]
    next if signature&.empty? || payload&.empty? || op&.empty?

    unless signature == Zlib.crc32(payload).to_s(16).rjust(8, "0")
      corrupted_tasks_count += 1
      next
    end

    if op == "add"
      task_id = entry[13...entry.index(":", 13).to_i]
      tasks[task_id] = entry
    elsif op == "rem"
      task_id = entry[13..]
      tasks.delete(task_id)
    end
  end

  if corrupted_tasks_count != 0
    puts "WARNING: Detected #{corrupted_tasks_count} corrupted deferred task(s)"
  end

  tasks.filter_map do |task_id, entry|
    _, _, _, serialized_publish_at, serialized_task = entry.split(":", 5)

    task = Marshal.load(serialized_task.undump)

    publish_at = (serialized_publish_at == DEFAULT_PUBLISH_AT ? nil : serialized_publish_at.to_i)

    if publish_at
      @delayed_tasks[task_id] = [serialized_task, publish_at]
    else
      @immediate_tasks_in_queue += 1
    end

    [task_id, task, publish_at]

  rescue ArgumentError, NameError => e
    puts "ERROR: Can't deserialize the task with id #{task_id}: (#{e.class}) #{e.message}"
    nil
  end
end

#remove(task_id) ⇒ Object

Add a record to the log representing a task removal.

Parameters:

  • task_id (String)


116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/rage/deferred/backends/disk.rb', line 116

def remove(task_id)
  write_to_storage(build_remove_entry(task_id))

  if @delayed_tasks.has_key?(task_id)
    @delayed_tasks.delete(task_id)
  else
    @immediate_tasks_in_queue -= 1
  end

  # rotate the storage once the size is over the limit and all non-delayed tasks are processed
  rotate_storage if @should_rotate && @immediate_tasks_in_queue == 0
end