Class: SidekiqUniqueJobs::Fetch::Reliable

Inherits:
Object
  • Object
show all
Includes:
Sidekiq::Component, JSON, Logging, Reflectable, Script::Caller
Defined in:
lib/sidekiq_unique_jobs/fetch/reliable.rb

Overview

Reliable fetch strategy using LMOVE to atomically move jobs from queues to a per-process working list. Jobs are tracked in the working list until acknowledged, providing crash recovery.

Features:

  • Atomic LMOVE from queue to working list (no job loss on crash)

  • Lock validation at fetch time via Lua script

  • Per-process heartbeat for dead process detection

  • Startup recovery of orphaned working lists

  • Compatible with all Sidekiq queue ordering modes

Author:

  • Mikael Henriksson <mikael@mhenrixon.com>

Defined Under Namespace

Classes: UnitOfWork

Constant Summary collapse

TIMEOUT =

Polling timeout — same as Sidekiq::BasicFetch

2
HEARTBEAT_TTL =

Heartbeat TTL — must be longer than TIMEOUT to avoid false death detection

60
HEARTBEAT_INTERVAL =

How often to refresh heartbeat

20

Instance Method Summary collapse

Methods included from JSON

dump_json, load_json, safe_load_json

Methods included from Reflectable

#reflect

Methods included from Logging

#build_message, included, #log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger, #logging_context, #with_configured_loggers_context, #with_logging_context

Methods included from Script::Caller

call_script, debug_lua, do_call, extract_args, max_history, normalize_argv, now_f, redis_version

Constructor Details

#initialize(capsule) ⇒ Reliable

Returns a new instance of Reliable.

Parameters:

  • capsule (Sidekiq::Capsule)


36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 36

def initialize(capsule)
  @config = capsule
  @strictly_ordered_queues = capsule.mode == :strict
  @queues = capsule.queues.map { |q| "queue:#{q}" }
  @queues.uniq! if @strictly_ordered_queues
  @identity = "#{Socket.gethostname}:#{Process.pid}:#{SecureRandom.hex(6)}"
  @working_key = Key.working(@identity)
  @done = false

  start_heartbeat
  recover_orphans
end

Instance Method Details

#bulk_requeue(inprogress) ⇒ Object

Called during shutdown to return in-progress jobs to their queues.

Parameters:



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 78

def bulk_requeue(inprogress)
  @done = true
  @heartbeat_thread&.join(1)
  return if inprogress.empty?

  logger.debug { "Re-queueing #{inprogress.size} jobs" }

  redis do |conn|
    conn.pipelined do |pipeline|
      inprogress.each do |uow|
        pipeline.call("RPUSH", uow.queue, uow.job)
        pipeline.call("LREM", @working_key, 1, uow.job)
      end
    end
  end

  logger.info("Pushed #{inprogress.size} jobs back to Redis")
rescue StandardError => ex
  logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end

#retrieve_workUnitOfWork?

Fetch the next job from any configured queue.

Uses non-blocking LMOVE + lock validation (via Lua) for all queues except the last, where it uses blocking BLMOVE to avoid CPU spin.

Returns:



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 55

def retrieve_work
  return nil if @done

  qs = queues_cmd
  return nil if qs.empty?

  redis do |conn|
    # Non-blocking: try each queue except the last
    if qs.size > 1
      qs[0..-2].each do |queue|
        work = fetch_nonblocking(conn, queue)
        return work if work
      end
    end

    # Blocking: wait on the last queue
    fetch_blocking(conn, qs.last)
  end
end