Class: Megatest::RedisQueue

Inherits:
AbstractQueue show all
Defined in:
lib/megatest/redis_queue.rb

Overview

Data structures

Note: All keys are prefixed by ‘build:<@build_id>:`

  • “leader-status”: String, either ‘setup` or `ready`

  • “queue”: List, contains the test ids that haven’t yet been popped.

  • “running”: SortedSet, members are the test ids currently being processed.

    Scores are the lease expiration timestamp. If the score is lower than
    current time, the test was lost and should be re-assigned.
    
  • “processed”: Set, members are the ids of test that were fully processed.

  • “owners”: Hash, contains a mapping of currently being processed tests and the worker they are assigned to.

    Keys are test ids, values are "worker:<@worker_id>:queue".
    
  • “worker:<@worker_id>:running”: Set, tests ids currently held by a worker.

  • “worker:<@worker_id>:failures”: List, all the ids of failed tests processed by a worker.

    Used as the base for a new queue when retrying a job. May contain duplicates.
    
  • “results”: List, inside are serialized TestCaseResult instances. Append only.

  • “requeues-count”: Hash, keys are test ids, values are the number of time that particular test

    was retried. There is also the special "___total___" key.
    

Defined Under Namespace

Classes: ExternalHeartbeatMonitor, RetryQueue

Constant Summary collapse

HEARTBEAT =
<<~'LUA'
  local running_key = KEYS[1]
  local processed_key = KEYS[2]
  local owners_key = KEYS[3]
  local worker_running_key = KEYS[4]

  local worker_id = ARGV[1]
  local current_time = ARGV[2]

  local count = 0

  local tests = redis.call('smembers', worker_running_key)
  for index = 1, #tests do
    local test = tests[index]

    -- # already processed, we do not need to bump the timestamp
    if redis.call('sismember', processed_key, test) == 0 then
      -- # we're still the owner of the test, we can bump the timestamp
      local owner_id = redis.call('hget', owners_key, test)
      if owner_id == worker_id then
        redis.call('zadd', running_key, current_time, test)
        count = count + 1
      end
    end
  end

  return count
LUA
RESERVE =
<<~'LUA'
  local queue_key = KEYS[1]
  local running_key = KEYS[2]
  local processed_key = KEYS[3]
  local owners_key = KEYS[4]
  local worker_running_key = KEYS[5]

  local worker_id = ARGV[1]
  local current_time = ARGV[2]
  local timeout = ARGV[3]

  -- # First we requeue all timed out tests
  local lost_tests = redis.call('zrangebyscore', running_key, 0, current_time - timeout)
  for _, test in ipairs(lost_tests) do
    if redis.call('sismember', processed_key, test) == 0 then
      local test = redis.call('rpush', queue_key, test)
    end
  end

  local test = redis.call('rpop', queue_key)
  if test then
    redis.call('zadd', running_key, current_time, test)
    redis.call('sadd', worker_running_key, test)
    redis.call('hset', owners_key, test, worker_id)
    return test
  end

  return nil
LUA
ACKNOWLEDGE =
<<~'LUA'
  local running_key = KEYS[1]
  local processed_key = KEYS[2]
  local owners_key = KEYS[3]
  local worker_running_key = KEYS[4]

  local test = ARGV[1]

  redis.call('zrem', running_key, test)
  redis.call('srem', worker_running_key, test)
  redis.call('hdel', owners_key, test) -- # Doesn't matter if it was reclaimed by another workers
  return redis.call('sadd', processed_key, test)
LUA

Instance Attribute Summary collapse

Attributes inherited from AbstractQueue

#size, #test_cases_index

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from AbstractQueue

#record_lost_test, #sharded?

Constructor Details

#initialize(config, ttl: 24 * 60 * 60) ⇒ RedisQueue

Returns a new instance of RedisQueue.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/megatest/redis_queue.rb', line 55

def initialize(config, ttl: 24 * 60 * 60)
  super(config)

  @summary = Queue::Summary.new
  @redis = RedisClient.new(
    url: config.queue_url,
    # We retry quite aggressively in case the network
    # is spotty, we'd rather wait a bit than to crash
    # a worker.
    reconnect_attempts: [0, 0, 0.1, 0.5, 1, 3, 5],
  )
  @ttl = ttl
  @load_timeout = 30 # TODO: configurable
  @worker_id = config.worker_id
  @build_id = config.build_id
  @success = true
  @leader = nil
  @script_cache = {}
  @leader = nil
end

Instance Attribute Details

#summaryObject (readonly)

Returns the value of attribute summary.



53
54
55
# File 'lib/megatest/redis_queue.rb', line 53

def summary
  @summary
end

Class Method Details

.build(config) ⇒ Object



44
45
46
47
48
49
50
# File 'lib/megatest/redis_queue.rb', line 44

def build(config)
  queue = new(config)
  if queue.retrying?
    queue = RetryQueue.build(config, queue)
  end
  queue
end

Instance Method Details

#cleanupObject



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/megatest/redis_queue.rb', line 86

def cleanup
  if @success
    if @worker_id
      @redis.call(
        "del",
        key("worker", worker_id, "running"),
        key("worker", worker_id, "failures"),
      )
    else
      @redis.call(
        "del",
        key("leader-status"),
        key("queue"),
        key("running"),
        key("processed"),
        key("owners"),
        key("results"),
        key("requeue-counts"),
      )
    end
  end
rescue RedisClient::ConnectionError
  false # Cleanup is best effort
end

#distributed?Boolean

Returns:

  • (Boolean)


159
160
161
# File 'lib/megatest/redis_queue.rb', line 159

def distributed?
  true
end

#empty?Boolean

Returns:

  • (Boolean)


178
179
180
# File 'lib/megatest/redis_queue.rb', line 178

def empty?
  remaining_size.zero?
end

#failed_test_idsObject



80
81
82
83
84
# File 'lib/megatest/redis_queue.rb', line 80

def failed_test_ids
  test_ids = @redis.call("lrange", key("worker", worker_id, "failures"), 0, -1)&.uniq
  test_ids.reverse!
  test_ids
end

#global_summaryObject



329
330
331
332
333
334
335
# File 'lib/megatest/redis_queue.rb', line 329

def global_summary
  if payloads = @redis.call("lrange", key("results"), 0, -1)
    Queue::Summary.new(payloads.map { |p| TestCaseResult.load(p) })
  else
    Queue::Summary.new
  end
end

#heartbeatObject



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/megatest/redis_queue.rb', line 140

def heartbeat
  eval_script(
    HEARTBEAT,
    keys: [
      key("running"),
      key("processed"),
      key("owners"),
      key("worker", worker_id, "running"),
    ],
    argv: [
      worker_id,
      Megatest.now,
    ],
  )
  true
rescue RedisClient::ConnectionError
  false # Heartbeat is best effort
end

#leader?Boolean

Returns:

  • (Boolean)


167
168
169
# File 'lib/megatest/redis_queue.rb', line 167

def leader?
  @leader
end

#pop_testObject



265
266
267
268
269
# File 'lib/megatest/redis_queue.rb', line 265

def pop_test
  if test_id = reserve
    test_cases_index.fetch(test_id)
  end
end

#populate(test_cases) ⇒ Object



232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/megatest/redis_queue.rb', line 232

def populate(test_cases)
  super

  value = key("leader-setup", worker_id)
  # NB: If we assumed redis 7+ this could be a single command: `SET <key> NX GET EX @ttl <value>`.
  _, _, leader = @redis.multi do |pipeline|
    pipeline.call("setnx", key("leader-status"), value)
    pipeline.call("expire", key("leader-status"), @ttl)
    pipeline.call("get", key("leader-status"))
  end
  @leader = leader == value

  if @leader
    @redis.multi do |transaction|
      transaction.call("lpush", key("queue"), test_cases.map(&:id)) unless test_cases.empty?
      transaction.call("expire", key("queue"), @ttl)
      transaction.call("set", key("leader-status"), "ready")
    end
  else
    (@load_timeout * 10).times do
      if populated?
        break
      else
        sleep 0.1
      end
    end
  end
end

#populated?Boolean

Returns:

  • (Boolean)


163
164
165
# File 'lib/megatest/redis_queue.rb', line 163

def populated?
  @redis.call("get", key("leader-status")) == "ready"
end

#record_result(original_result) ⇒ Object



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# File 'lib/megatest/redis_queue.rb', line 285

def record_result(original_result)
  result = original_result
  if result.failed? && !result.skipped?
    if attempt_to_retry?(result)
      result = result.retry
    else
      @success = false
    end
  end
  @summary.record_result(result)

  if result.retried?
    @redis.pipelined do |pipeline|
      pipeline.call("rpush", key("results"), result.dump)
      pipeline.call("expire", key("results"), @ttl)
    end
  else
    load_script(ACKNOWLEDGE)
    @redis.pipelined do |pipeline|
      eval_script(
        ACKNOWLEDGE,
        keys: [
          key("running"),
          key("processed"),
          key("owners"),
          key("worker", worker_id, "running"),
        ],
        argv: [result.test_id],
        redis: pipeline,
      )
      if result.failed?
        pipeline.call("rpush", key("worker", worker_id, "failures"), result.test_id)
        pipeline.call("expire", key("worker", worker_id, "failures"), @ttl)
      elsif result.success?
        pipeline.call("lrem", key("worker", worker_id, "failures"), 0, result.test_id)
      end
      pipeline.call("rpush", key("results"), result.dump)
      pipeline.call("expire", key("results"), @ttl)
    end
  end

  result
end

#remaining_sizeObject



171
172
173
174
175
176
# File 'lib/megatest/redis_queue.rb', line 171

def remaining_size
  @redis.multi do |transaction|
    transaction.call("llen", key("queue"))
    transaction.call("zcard", key("running"))
  end.inject(:+)
end

#reserveObject



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/megatest/redis_queue.rb', line 212

def reserve
  load_script(RESERVE)
  test_id, = eval_script(
    RESERVE,
    keys: [
      key("queue"),
      key("running"),
      key("processed"),
      key("owners"),
      key("worker", worker_id, "running"),
    ],
    argv: [
      worker_id,
      Megatest.now,
      @config.heartbeat_frequency * 2,
    ],
  )
  test_id
end

#retrying?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/megatest/redis_queue.rb', line 76

def retrying?
  @worker_id && !@redis.call("llen", key("worker", worker_id, "failures")).zero?
end

#success?Boolean

Returns:

  • (Boolean)


261
262
263
# File 'lib/megatest/redis_queue.rb', line 261

def success?
  @success
end