Class: Kaal::DelayedJob::RedisEngine
- Defined in:
- lib/kaal/delayed_job/redis_engine.rb
Overview
Redis-backed delayed-job store using a sorted set plus payload hash.
Class Method Summary collapse
Instance Method Summary collapse
- #all_jobs ⇒ Object
- #claim_strategy ⇒ Object
- #enqueue(job_id:, run_at:, job_class:, args:, queue: nil) ⇒ Object
- #find_job(job_id) ⇒ Object
-
#initialize(redis, namespace: 'kaal') ⇒ RedisEngine
constructor
A new instance of RedisEngine.
- #pop_due(now:, limit:) ⇒ Object
Methods inherited from Registry
Constructor Details
#initialize(redis, namespace: 'kaal') ⇒ RedisEngine
Returns a new instance of RedisEngine.
15 16 17 18 19 |
# File 'lib/kaal/delayed_job/redis_engine.rb', line 15 def initialize(redis, namespace: 'kaal') super() @redis = redis @namespace = namespace end |
Class Method Details
.deserialize(raw) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/kaal/delayed_job/redis_engine.rb', line 82 def self.deserialize(raw) return nil unless raw parsed = JSON.parse(raw) run_at = parse_time(parsed['run_at']) created_at = parse_time(parsed['created_at']) return nil unless run_at && created_at { job_id: parsed['job_id'], run_at: run_at, job_class: parsed['job_class'], args: parsed['args'] || [], queue: parsed['queue'], created_at: created_at } rescue JSON::ParserError nil end |
.parse_time(value) ⇒ Object
102 103 104 105 106 |
# File 'lib/kaal/delayed_job/redis_engine.rb', line 102 def self.parse_time(value) Time.iso8601(value.to_s) rescue ArgumentError nil end |
Instance Method Details
#all_jobs ⇒ Object
74 75 76 |
# File 'lib/kaal/delayed_job/redis_engine.rb', line 74 def all_jobs Array(@redis.zrange(schedule_key, 0, -1)).filter_map { |job_id| find_job(job_id) } end |
#claim_strategy ⇒ Object
78 79 80 |
# File 'lib/kaal/delayed_job/redis_engine.rb', line 78 def claim_strategy :atomic_pop end |
#enqueue(job_id:, run_at:, job_class:, args:, queue: nil) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/kaal/delayed_job/redis_engine.rb', line 21 def enqueue(job_id:, run_at:, job_class:, args:, queue: nil, **) payload = JSON.generate( job_id: job_id, run_at: run_at.iso8601, job_class: job_class, args: args, queue: queue, created_at: Time.now.utc.iso8601 ) script = <<~LUA if redis.call('HEXISTS', KEYS[1], ARGV[1]) == 1 then return 0 end redis.call('HSET', KEYS[1], ARGV[1], ARGV[2]) redis.call('ZADD', KEYS[2], ARGV[3], ARGV[1]) return 1 LUA result = @redis.eval(script, keys: [payloads_key, schedule_key], argv: [job_id, payload, run_at.to_f]) raise DuplicateJobError, "Delayed job #{job_id.inspect} already exists" unless [1, '1', true].include?(result) find_job(job_id) end |
#find_job(job_id) ⇒ Object
70 71 72 |
# File 'lib/kaal/delayed_job/redis_engine.rb', line 70 def find_job(job_id) self.class.deserialize(@redis.hget(payloads_key, job_id)) end |
#pop_due(now:, limit:) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/kaal/delayed_job/redis_engine.rb', line 47 def pop_due(now:, limit:) script = <<~LUA local job_ids = redis.call('ZRANGEBYSCORE', KEYS[2], '-inf', ARGV[1], 'LIMIT', 0, ARGV[2]) local payloads = {} for _, job_id in ipairs(job_ids) do if redis.call('ZREM', KEYS[2], job_id) == 1 then local payload = redis.call('HGET', KEYS[1], job_id) if payload then redis.call('HDEL', KEYS[1], job_id) table.insert(payloads, payload) end end end return payloads LUA Array(@redis.eval(script, keys: [payloads_key, schedule_key], argv: [now.to_f, limit])).filter_map do |raw| self.class.deserialize(raw) end end |