Class: Megatest::RedisQueue
- Inherits:
-
AbstractQueue
- Object
- AbstractQueue
- Megatest::RedisQueue
- Defined in:
- lib/megatest/redis_queue.rb
Overview
Data structures
Note: All keys are prefixed by ‘build:<@build_id>:`
-
“leader-status”: String, either ‘setup` or `ready`
-
“queue”: List, contains the test ids that haven’t yet been popped.
-
“running”: SortedSet, members are the test ids currently being processed.
Scores are the lease expiration timestamp. If the score is lower than current time, the test was lost and should be re-assigned. -
“processed”: Set, members are the ids of test that were fully processed.
-
“owners”: Hash, contains a mapping of currently being processed tests and the worker they are assigned to.
Keys are test ids, values are "worker:<@worker_id>:queue". -
“worker:<@worker_id>:running”: Set, tests ids currently held by a worker.
-
“worker:<@worker_id>:failures”: List, all the ids of failed tests processed by a worker.
Used as the base for a new queue when retrying a job. May contain duplicates. -
“results”: List, inside are serialized TestCaseResult instances. Append only.
-
“requeues-count”: Hash, keys are test ids, values are the number of time that particular test
was retried. There is also the special "___total___" key.
Defined Under Namespace
Classes: ExternalHeartbeatMonitor, RetryQueue
Constant Summary collapse
- HEARTBEAT =
<<~'LUA' local running_key = KEYS[1] local processed_key = KEYS[2] local owners_key = KEYS[3] local worker_running_key = KEYS[4] local worker_id = ARGV[1] local current_time = ARGV[2] local count = 0 local tests = redis.call('smembers', worker_running_key) for index = 1, #tests do local test = tests[index] -- # already processed, we do not need to bump the timestamp if redis.call('sismember', processed_key, test) == 0 then -- # we're still the owner of the test, we can bump the timestamp local owner_id = redis.call('hget', owners_key, test) if owner_id == worker_id then redis.call('zadd', running_key, current_time, test) count = count + 1 end end end return count LUA
- RESERVE =
<<~'LUA' local queue_key = KEYS[1] local running_key = KEYS[2] local processed_key = KEYS[3] local owners_key = KEYS[4] local worker_running_key = KEYS[5] local worker_id = ARGV[1] local current_time = ARGV[2] local timeout = ARGV[3] -- # First we requeue all timed out tests local lost_tests = redis.call('zrangebyscore', running_key, 0, current_time - timeout) for _, test in ipairs(lost_tests) do if redis.call('sismember', processed_key, test) == 0 then local test = redis.call('rpush', queue_key, test) end end local test = redis.call('rpop', queue_key) if test then redis.call('zadd', running_key, current_time, test) redis.call('sadd', worker_running_key, test) redis.call('hset', owners_key, test, worker_id) return test end return nil LUA
- ACKNOWLEDGE =
<<~'LUA' local running_key = KEYS[1] local processed_key = KEYS[2] local owners_key = KEYS[3] local worker_running_key = KEYS[4] local test = ARGV[1] redis.call('zrem', running_key, test) redis.call('srem', worker_running_key, test) redis.call('hdel', owners_key, test) -- # Doesn't matter if it was reclaimed by another workers return redis.call('sadd', processed_key, test) LUA
Instance Attribute Summary collapse
-
#summary ⇒ Object
readonly
Returns the value of attribute summary.
Attributes inherited from AbstractQueue
Class Method Summary collapse
Instance Method Summary collapse
- #cleanup ⇒ Object
- #distributed? ⇒ Boolean
- #empty? ⇒ Boolean
- #failed_test_ids ⇒ Object
- #global_summary ⇒ Object
- #heartbeat ⇒ Object
-
#initialize(config, ttl: 24 * 60 * 60) ⇒ RedisQueue
constructor
A new instance of RedisQueue.
- #leader? ⇒ Boolean
- #pop_test ⇒ Object
- #populate(test_cases) ⇒ Object
- #populated? ⇒ Boolean
- #record_result(original_result) ⇒ Object
- #remaining_size ⇒ Object
- #reserve ⇒ Object
- #retrying? ⇒ Boolean
- #success? ⇒ Boolean
Methods inherited from AbstractQueue
Constructor Details
#initialize(config, ttl: 24 * 60 * 60) ⇒ RedisQueue
Returns a new instance of RedisQueue.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/megatest/redis_queue.rb', line 55 def initialize(config, ttl: 24 * 60 * 60) super(config) @summary = Queue::Summary.new @redis = RedisClient.new( url: config.queue_url, # We retry quite aggressively in case the network # is spotty, we'd rather wait a bit than to crash # a worker. reconnect_attempts: [0, 0, 0.1, 0.5, 1, 3, 5], ) @ttl = ttl @load_timeout = 30 # TODO: configurable @worker_id = config.worker_id @build_id = config.build_id @success = true @leader = nil @script_cache = {} @leader = nil end |
Instance Attribute Details
#summary ⇒ Object (readonly)
Returns the value of attribute summary.
53 54 55 |
# File 'lib/megatest/redis_queue.rb', line 53 def summary @summary end |
Class Method Details
.build(config) ⇒ Object
44 45 46 47 48 49 50 |
# File 'lib/megatest/redis_queue.rb', line 44 def build(config) queue = new(config) if queue. queue = RetryQueue.build(config, queue) end queue end |
Instance Method Details
#cleanup ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/megatest/redis_queue.rb', line 86 def cleanup if @success if @worker_id @redis.call( "del", key("worker", worker_id, "running"), key("worker", worker_id, "failures"), ) else @redis.call( "del", key("leader-status"), key("queue"), key("running"), key("processed"), key("owners"), key("results"), key("requeue-counts"), ) end end rescue RedisClient::ConnectionError false # Cleanup is best effort end |
#distributed? ⇒ Boolean
159 160 161 |
# File 'lib/megatest/redis_queue.rb', line 159 def distributed? true end |
#empty? ⇒ Boolean
178 179 180 |
# File 'lib/megatest/redis_queue.rb', line 178 def empty? remaining_size.zero? end |
#failed_test_ids ⇒ Object
80 81 82 83 84 |
# File 'lib/megatest/redis_queue.rb', line 80 def failed_test_ids test_ids = @redis.call("lrange", key("worker", worker_id, "failures"), 0, -1)&.uniq test_ids.reverse! test_ids end |
#global_summary ⇒ Object
329 330 331 332 333 334 335 |
# File 'lib/megatest/redis_queue.rb', line 329 def global_summary if payloads = @redis.call("lrange", key("results"), 0, -1) Queue::Summary.new(payloads.map { |p| TestCaseResult.load(p) }) else Queue::Summary.new end end |
#heartbeat ⇒ Object
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/megatest/redis_queue.rb', line 140 def heartbeat eval_script( HEARTBEAT, keys: [ key("running"), key("processed"), key("owners"), key("worker", worker_id, "running"), ], argv: [ worker_id, Megatest.now, ], ) true rescue RedisClient::ConnectionError false # Heartbeat is best effort end |
#leader? ⇒ Boolean
167 168 169 |
# File 'lib/megatest/redis_queue.rb', line 167 def leader? @leader end |
#pop_test ⇒ Object
265 266 267 268 269 |
# File 'lib/megatest/redis_queue.rb', line 265 def pop_test if test_id = reserve test_cases_index.fetch(test_id) end end |
#populate(test_cases) ⇒ Object
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/megatest/redis_queue.rb', line 232 def populate(test_cases) super value = key("leader-setup", worker_id) # NB: If we assumed redis 7+ this could be a single command: `SET <key> NX GET EX @ttl <value>`. _, _, leader = @redis.multi do |pipeline| pipeline.call("setnx", key("leader-status"), value) pipeline.call("expire", key("leader-status"), @ttl) pipeline.call("get", key("leader-status")) end @leader = leader == value if @leader @redis.multi do |transaction| transaction.call("lpush", key("queue"), test_cases.map(&:id)) unless test_cases.empty? transaction.call("expire", key("queue"), @ttl) transaction.call("set", key("leader-status"), "ready") end else (@load_timeout * 10).times do if populated? break else sleep 0.1 end end end end |
#populated? ⇒ Boolean
163 164 165 |
# File 'lib/megatest/redis_queue.rb', line 163 def populated? @redis.call("get", key("leader-status")) == "ready" end |
#record_result(original_result) ⇒ Object
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 |
# File 'lib/megatest/redis_queue.rb', line 285 def record_result(original_result) result = original_result if result.failed? && !result.skipped? if attempt_to_retry?(result) result = result.retry else @success = false end end @summary.record_result(result) if result.retried? @redis.pipelined do |pipeline| pipeline.call("rpush", key("results"), result.dump) pipeline.call("expire", key("results"), @ttl) end else load_script(ACKNOWLEDGE) @redis.pipelined do |pipeline| eval_script( ACKNOWLEDGE, keys: [ key("running"), key("processed"), key("owners"), key("worker", worker_id, "running"), ], argv: [result.test_id], redis: pipeline, ) if result.failed? pipeline.call("rpush", key("worker", worker_id, "failures"), result.test_id) pipeline.call("expire", key("worker", worker_id, "failures"), @ttl) elsif result.success? pipeline.call("lrem", key("worker", worker_id, "failures"), 0, result.test_id) end pipeline.call("rpush", key("results"), result.dump) pipeline.call("expire", key("results"), @ttl) end end result end |
#remaining_size ⇒ Object
171 172 173 174 175 176 |
# File 'lib/megatest/redis_queue.rb', line 171 def remaining_size @redis.multi do |transaction| transaction.call("llen", key("queue")) transaction.call("zcard", key("running")) end.inject(:+) end |
#reserve ⇒ Object
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/megatest/redis_queue.rb', line 212 def reserve load_script(RESERVE) test_id, = eval_script( RESERVE, keys: [ key("queue"), key("running"), key("processed"), key("owners"), key("worker", worker_id, "running"), ], argv: [ worker_id, Megatest.now, @config.heartbeat_frequency * 2, ], ) test_id end |
#retrying? ⇒ Boolean
76 77 78 |
# File 'lib/megatest/redis_queue.rb', line 76 def @worker_id && !@redis.call("llen", key("worker", worker_id, "failures")).zero? end |
#success? ⇒ Boolean
261 262 263 |
# File 'lib/megatest/redis_queue.rb', line 261 def success? @success end |