Class: SidekiqUniqueJobs::Fetch::Reliable
- Inherits:
-
Object
- Object
- SidekiqUniqueJobs::Fetch::Reliable
- Includes:
- Sidekiq::Component, JSON, Logging, Reflectable, Script::Caller
- Defined in:
- lib/sidekiq_unique_jobs/fetch/reliable.rb
Overview
Reliable fetch strategy using LMOVE to atomically move jobs from queues to a per-process working list. Jobs are tracked in the working list until acknowledged, providing crash recovery.
Features:
-
Atomic LMOVE from queue to working list (no job loss on crash)
-
Lock validation at fetch time via Lua script
-
Per-process heartbeat for dead process detection
-
Startup recovery of orphaned working lists
-
Compatible with all Sidekiq queue ordering modes
Defined Under Namespace
Classes: UnitOfWork
Constant Summary collapse
- TIMEOUT =
Polling timeout — same as Sidekiq::BasicFetch
2- HEARTBEAT_TTL =
Heartbeat TTL — must be longer than TIMEOUT to avoid false death detection
60- HEARTBEAT_INTERVAL =
How often to refresh heartbeat
20
Instance Method Summary collapse
-
#bulk_requeue(inprogress) ⇒ Object
Called during shutdown to return in-progress jobs to their queues.
-
#initialize(capsule) ⇒ Reliable
constructor
A new instance of Reliable.
-
#retrieve_work ⇒ UnitOfWork?
Fetch the next job from any configured queue.
Methods included from JSON
dump_json, load_json, safe_load_json
Methods included from Reflectable
Methods included from Logging
#build_message, included, #log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger, #logging_context, #with_configured_loggers_context, #with_logging_context
Methods included from Script::Caller
call_script, debug_lua, do_call, extract_args, max_history, normalize_argv, now_f, redis_version
Constructor Details
#initialize(capsule) ⇒ Reliable
Returns a new instance of Reliable.
36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 36 def initialize(capsule) @config = capsule @strictly_ordered_queues = capsule.mode == :strict @queues = capsule.queues.map { |q| "queue:#{q}" } @queues.uniq! if @strictly_ordered_queues @identity = "#{Socket.gethostname}:#{Process.pid}:#{SecureRandom.hex(6)}" @working_key = Key.working(@identity) @done = false start_heartbeat recover_orphans end |
Instance Method Details
#bulk_requeue(inprogress) ⇒ Object
Called during shutdown to return in-progress jobs to their queues.
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 78 def bulk_requeue(inprogress) @done = true @heartbeat_thread&.join(1) return if inprogress.empty? logger.debug { "Re-queueing #{inprogress.size} jobs" } redis do |conn| conn.pipelined do |pipeline| inprogress.each do |uow| pipeline.call("RPUSH", uow.queue, uow.job) pipeline.call("LREM", @working_key, 1, uow.job) end end end logger.info("Pushed #{inprogress.size} jobs back to Redis") rescue StandardError => ex logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.}") end |
#retrieve_work ⇒ UnitOfWork?
Fetch the next job from any configured queue.
Uses non-blocking LMOVE + lock validation (via Lua) for all queues except the last, where it uses blocking BLMOVE to avoid CPU spin.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 55 def retrieve_work return nil if @done qs = queues_cmd return nil if qs.empty? redis do |conn| # Non-blocking: try each queue except the last if qs.size > 1 qs[0..-2].each do |queue| work = fetch_nonblocking(conn, queue) return work if work end end # Blocking: wait on the last queue fetch_blocking(conn, qs.last) end end |