Module: Wurk::Lua

Defined in:
lib/wurk/lua.rb,
lib/wurk/lua/loader.rb

Overview

EVALSHA-cached Lua scripts. Loaded once per pool, never re-uploaded. Bulk enqueue, multi-pop, atomic schedule promotion, batch ops.

Source strings are intentionally bare — the SHA1 of each is computed at load time and is the same value Redis reports from ‘SCRIPT LOAD`. Whitespace edits change the SHA, which forces a re-upload at runtime.

‘:zpopbyscore` is reproduced verbatim from sidekiq-free.md §1.8 and MUST NOT diverge — parity tests will fail on a single byte change.

Defined Under Namespace

Classes: Loader

Constant Summary collapse

ZPOPBYSCORE =
<<~LUA
  local key, now = KEYS[1], ARGV[1]
  local jobs = redis.call("zrange", key, "-inf", now, "byscore", "limit", 0, 1)
  if jobs[1] then
    redis.call("zrem", key, jobs[1])
    return jobs[1]
  end
LUA
BULK_PUSH =

Bulk enqueue to a single queue. KEYS = [queue_list, queues_set] ARGV = [queue_name, job_json, …] Returns the number of jobs pushed.

<<~LUA
  redis.call("sadd", KEYS[2], ARGV[1])
  for i = 2, #ARGV do
    redis.call("lpush", KEYS[1], ARGV[i])
  end
  return #ARGV - 1
LUA
RELIABLE_SCHEDULE_PROMOTE =

Pro reliable scheduler: atomically promote all due jobs in a sorted set to their target queues. Pure-Ruby promotion does ZRANGE → ZREM →LPUSH non-atomically and can lose jobs on a mid-step crash. KEYS = [sorted_set, queues_set] ARGV = [now, queue_prefix] Returns the number of jobs promoted. Order matters: decode + push BEFORE zrem. Redis Lua has no rollback, so a failed cjson.decode after a zrem would lose the job. Decode first; push first; only then remove from the sorted set. Worst case is a crash between lpush and zrem → at-least-once redelivery, never loss.

<<~LUA
  local jobs = redis.call("zrangebyscore", KEYS[1], "-inf", ARGV[1])
  for i = 1, #jobs do
    local job = jobs[i]
    local q = cjson.decode(job)["queue"]
    redis.call("sadd", KEYS[2], q)
    redis.call("lpush", ARGV[2] .. q, job)
    redis.call("zrem", KEYS[1], job)
  end
  return #jobs
LUA
BATCH_PUSH =

Pro Batch: register a job into a batch and push it to its queue atomically. Keeps total/pending in sync with the jids set. KEYS = [b-<bid>, b-<bid>-jids, queue_list, queues_set] ARGV = [queue_name, jid, job_json] Returns 1.

<<~LUA
  redis.call("hincrby", KEYS[1], "total", 1)
  redis.call("hincrby", KEYS[1], "pending", 1)
  redis.call("sadd", KEYS[2], ARGV[2])
  redis.call("sadd", KEYS[4], ARGV[1])
  redis.call("lpush", KEYS[3], ARGV[3])
  return 1
LUA
BATCH_ACK_SUCCESS =

Pro Batch: ACK a job that completed successfully. SREM from the live jids set and decrement pending iff the jid was a member (idempotent against double-success on a flaky retry). KEYS = [b-<bid>, b-<bid>-jids] ARGV = [jid] Returns [new_pending, live_jids_remaining], or [-1, -1] when the jid was not a member (treat as already acked).

<<~LUA
  local removed = redis.call("srem", KEYS[2], ARGV[1])
  if removed == 1 then
    local pending = redis.call("hincrby", KEYS[1], "pending", -1)
    return { pending, redis.call("scard", KEYS[2]) }
  end
  return { -1, -1 }
LUA
BATCH_ACK_COMPLETE =

Pro Batch: ACK a job that exhausted retries and died. Records death, bumps failures, and SREMs from live jids so the batch can fire ‘:complete` even with terminally failed jobs. KEYS = [b-<bid>, b-<bid>-jids, b-<bid>-died, b-<bid>-failed] ARGV = [jid] Returns [live_jids_remaining, died_count, first_death]. `first_death` is 1 the first time any jid is SADDed into the died set, 0 thereafter — caller uses it to fire `:death` exactly once per batch.

<<~LUA
  local was_pre_existing_death = redis.call("scard", KEYS[3])
  redis.call("srem", KEYS[2], ARGV[1])
  redis.call("sadd", KEYS[4], ARGV[1])
  local died_added = redis.call("sadd", KEYS[3], ARGV[1])
  redis.call("hincrby", KEYS[1], "failures", 1)
  local first_death = 0
  if was_pre_existing_death == 0 and died_added == 1 then
    first_death = 1
  end
  return { redis.call("scard", KEYS[2]), redis.call("scard", KEYS[3]), first_death }
LUA
BATCH_INVALIDATE =

Pro Batch: invalidate all pending jobs. The jobs themselves stay in their queues — the server middleware short-circuits when it sees the invalidated flag — but the jids set is cleared so the batch can no longer accept completion callbacks. KEYS = [b-<bid>, b-<bid>-jids] ARGV = [] Returns 1.

<<~LUA
  redis.call("del", KEYS[2])
  redis.call("hset", KEYS[1], "invalidated", "1")
  return 1
LUA
FAST_DELETE_JOB =

Pro Fast API (§11): server-side LRANGE+LREM to delete a single job by jid from a queue list. Pure-Ruby Queue#find_job + JobRecord#delete is O(N) round-trips; this is O(1) round-trip with O(N) Lua work. KEYS = [queue:<name>] ARGV = [jid] Returns the number of payloads removed (0 or 1; can be >1 in pathological duplicate-jid corruption — caller doesn’t rely on the value).

<<~LUA
  local items = redis.call("lrange", KEYS[1], 0, -1)
  local removed = 0
  for i = 1, #items do
    if string.find(items[i], '"jid":"' .. ARGV[1] .. '"', 1, true) then
      removed = removed + redis.call("lrem", KEYS[1], 1, items[i])
    end
  end
  return removed
LUA
FAST_DELETE_BY_CLASS =

Pro Fast API (§11): server-side LRANGE+LREM removing every payload whose ‘“class”:“<klass>”` field matches. Plain-text scan (no JSON parse) so it tolerates partial corruption — caller drops only well-formed matches. KEYS = [queue:<name>] ARGV = [klass] Returns the number of payloads removed.

<<~LUA
  local items = redis.call("lrange", KEYS[1], 0, -1)
  local removed = 0
  local needle = '"class":"' .. ARGV[1] .. '"'
  for i = 1, #items do
    if string.find(items[i], needle, 1, true) then
      removed = removed + redis.call("lrem", KEYS[1], 1, items[i])
    end
  end
  return removed
LUA
LUA_DIR =

Limiter scripts live in ‘lib/wurk/lua/limiter_*.lua` — one file per type. Loaded at boot, the file’s basename (minus ‘.lua`) becomes the SCRIPTS key as a symbol. Keeping them as separate files makes diffing individual rate-limiter changes painless and keeps each script self- contained for the `redis-cli –eval` debug workflow.

File.expand_path('lua', __dir__)
FILE_SCRIPTS =
Dir.glob(File.join(LUA_DIR, '*.lua')).each_with_object({}) do |path, h|
  h[File.basename(path, '.lua').to_sym] = File.read(path)
end.freeze
SCRIPTS =
{
  zpopbyscore: ZPOPBYSCORE,
  bulk_push: BULK_PUSH,
  reliable_schedule_promote: RELIABLE_SCHEDULE_PROMOTE,
  batch_push: BATCH_PUSH,
  batch_ack_success: BATCH_ACK_SUCCESS,
  batch_ack_complete: BATCH_ACK_COMPLETE,
  batch_invalidate: BATCH_INVALIDATE,
  fast_delete_job: FAST_DELETE_JOB,
  fast_delete_by_class: FAST_DELETE_BY_CLASS
}.merge(FILE_SCRIPTS).freeze
SHAS =

SHA1 of each script source — matches what ‘SCRIPT LOAD` returns. Precomputing keeps `eval_cached` allocation-free in the hot path.

SCRIPTS.transform_values { |src| Digest::SHA1.hexdigest(src) }.freeze