Module: Wurk::Lua
- Defined in:
- lib/wurk/lua.rb,
lib/wurk/lua/loader.rb
Overview
EVALSHA-cached Lua scripts. Loaded once per pool, never re-uploaded. Bulk enqueue, multi-pop, atomic schedule promotion, batch ops.
Source strings are intentionally bare — the SHA1 of each is computed at load time and is the same value Redis reports from ‘SCRIPT LOAD`. Whitespace edits change the SHA, which forces a re-upload at runtime.
‘:zpopbyscore` is reproduced verbatim from sidekiq-free.md §1.8 and MUST NOT diverge — parity tests will fail on a single byte change.
Defined Under Namespace
Classes: Loader
Constant Summary collapse
- ZPOPBYSCORE =
<<~LUA local key, now = KEYS[1], ARGV[1] local jobs = redis.call("zrange", key, "-inf", now, "byscore", "limit", 0, 1) if jobs[1] then redis.call("zrem", key, jobs[1]) return jobs[1] end LUA
- BULK_PUSH =
Bulk enqueue to a single queue. KEYS = [queue_list, queues_set] ARGV = [queue_name, job_json, …] Returns the number of jobs pushed.
<<~LUA redis.call("sadd", KEYS[2], ARGV[1]) for i = 2, #ARGV do redis.call("lpush", KEYS[1], ARGV[i]) end return #ARGV - 1 LUA
- RELIABLE_SCHEDULE_PROMOTE =
Pro reliable scheduler: atomically promote all due jobs in a sorted set to their target queues. Pure-Ruby promotion does ZRANGE → ZREM →LPUSH non-atomically and can lose jobs on a mid-step crash. KEYS = [sorted_set, queues_set] ARGV = [now, queue_prefix] Returns the number of jobs promoted. Order matters: decode + push BEFORE zrem. Redis Lua has no rollback, so a failed cjson.decode after a zrem would lose the job. Decode first; push first; only then remove from the sorted set. Worst case is a crash between lpush and zrem → at-least-once redelivery, never loss.
<<~LUA local jobs = redis.call("zrangebyscore", KEYS[1], "-inf", ARGV[1]) for i = 1, #jobs do local job = jobs[i] local q = cjson.decode(job)["queue"] redis.call("sadd", KEYS[2], q) redis.call("lpush", ARGV[2] .. q, job) redis.call("zrem", KEYS[1], job) end return #jobs LUA
- BATCH_PUSH =
Pro Batch: register a job into a batch and push it to its queue atomically. Keeps total/pending in sync with the jids set. KEYS = [b-<bid>, b-<bid>-jids, queue_list, queues_set] ARGV = [queue_name, jid, job_json] Returns 1.
<<~LUA redis.call("hincrby", KEYS[1], "total", 1) redis.call("hincrby", KEYS[1], "pending", 1) redis.call("sadd", KEYS[2], ARGV[2]) redis.call("sadd", KEYS[4], ARGV[1]) redis.call("lpush", KEYS[3], ARGV[3]) return 1 LUA
- BATCH_ACK_SUCCESS =
Pro Batch: ACK a job that completed successfully. SREM from the live jids set and decrement pending iff the jid was a member (idempotent against double-success on a flaky retry). KEYS = [b-<bid>, b-<bid>-jids] ARGV = [jid] Returns [new_pending, live_jids_remaining], or [-1, -1] when the jid was not a member (treat as already acked).
<<~LUA local removed = redis.call("srem", KEYS[2], ARGV[1]) if removed == 1 then local pending = redis.call("hincrby", KEYS[1], "pending", -1) return { pending, redis.call("scard", KEYS[2]) } end return { -1, -1 } LUA
- BATCH_ACK_COMPLETE =
Pro Batch: ACK a job that exhausted retries and died. Records death, bumps failures, and SREMs from live jids so the batch can fire ‘:complete` even with terminally failed jobs. KEYS = [b-<bid>, b-<bid>-jids, b-<bid>-died, b-<bid>-failed] ARGV = [jid] Returns [live_jids_remaining, died_count, first_death]. `first_death` is 1 the first time any jid is SADDed into the died set, 0 thereafter — caller uses it to fire `:death` exactly once per batch.
<<~LUA local was_pre_existing_death = redis.call("scard", KEYS[3]) redis.call("srem", KEYS[2], ARGV[1]) redis.call("sadd", KEYS[4], ARGV[1]) local died_added = redis.call("sadd", KEYS[3], ARGV[1]) redis.call("hincrby", KEYS[1], "failures", 1) local first_death = 0 if was_pre_existing_death == 0 and died_added == 1 then first_death = 1 end return { redis.call("scard", KEYS[2]), redis.call("scard", KEYS[3]), first_death } LUA
- BATCH_INVALIDATE =
Pro Batch: invalidate all pending jobs. The jobs themselves stay in their queues — the server middleware short-circuits when it sees the invalidated flag — but the jids set is cleared so the batch can no longer accept completion callbacks. KEYS = [b-<bid>, b-<bid>-jids] ARGV = [] Returns 1.
<<~LUA redis.call("del", KEYS[2]) redis.call("hset", KEYS[1], "invalidated", "1") return 1 LUA
- FAST_DELETE_JOB =
Pro Fast API (§11): server-side LRANGE+LREM to delete a single job by jid from a queue list. Pure-Ruby Queue#find_job + JobRecord#delete is O(N) round-trips; this is O(1) round-trip with O(N) Lua work. KEYS = [queue:<name>] ARGV = [jid] Returns the number of payloads removed (0 or 1; can be >1 in pathological duplicate-jid corruption — caller doesn’t rely on the value).
<<~LUA local items = redis.call("lrange", KEYS[1], 0, -1) local removed = 0 for i = 1, #items do if string.find(items[i], '"jid":"' .. ARGV[1] .. '"', 1, true) then removed = removed + redis.call("lrem", KEYS[1], 1, items[i]) end end return removed LUA
- FAST_DELETE_BY_CLASS =
Pro Fast API (§11): server-side LRANGE+LREM removing every payload whose ‘“class”:“<klass>”` field matches. Plain-text scan (no JSON parse) so it tolerates partial corruption — caller drops only well-formed matches. KEYS = [queue:<name>] ARGV = [klass] Returns the number of payloads removed.
<<~LUA local items = redis.call("lrange", KEYS[1], 0, -1) local removed = 0 local needle = '"class":"' .. ARGV[1] .. '"' for i = 1, #items do if string.find(items[i], needle, 1, true) then removed = removed + redis.call("lrem", KEYS[1], 1, items[i]) end end return removed LUA
- LUA_DIR =
Limiter scripts live in ‘lib/wurk/lua/limiter_*.lua` — one file per type. Loaded at boot, the file’s basename (minus ‘.lua`) becomes the SCRIPTS key as a symbol. Keeping them as separate files makes diffing individual rate-limiter changes painless and keeps each script self- contained for the `redis-cli –eval` debug workflow.
File.('lua', __dir__)
- FILE_SCRIPTS =
Dir.glob(File.join(LUA_DIR, '*.lua')).each_with_object({}) do |path, h| h[File.basename(path, '.lua').to_sym] = File.read(path) end.freeze
- SCRIPTS =
{ zpopbyscore: ZPOPBYSCORE, bulk_push: BULK_PUSH, reliable_schedule_promote: RELIABLE_SCHEDULE_PROMOTE, batch_push: BATCH_PUSH, batch_ack_success: BATCH_ACK_SUCCESS, batch_ack_complete: BATCH_ACK_COMPLETE, batch_invalidate: BATCH_INVALIDATE, fast_delete_job: FAST_DELETE_JOB, fast_delete_by_class: FAST_DELETE_BY_CLASS }.merge(FILE_SCRIPTS).freeze
- SHAS =
SHA1 of each script source — matches what ‘SCRIPT LOAD` returns. Precomputing keeps `eval_cached` allocation-free in the hot path.
SCRIPTS.transform_values { |src| Digest::SHA1.hexdigest(src) }.freeze