Class: SidekiqUniqueJobs::Fetch::Reliable::UnitOfWork

Inherits:
Object
  • Object
show all
Includes:
JSON, Logging, Script::Caller
Defined in:
lib/sidekiq_unique_jobs/fetch/reliable.rb

Overview

UnitOfWork holds a fetched job with lock-aware acknowledge and requeue

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#build_message, included, #log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger, #logging_context, #with_configured_loggers_context, #with_logging_context

Methods included from JSON

dump_json, load_json, safe_load_json

Methods included from Script::Caller

call_script, debug_lua, do_call, extract_args, max_history, normalize_argv, now_f, redis_version

Constructor Details

#initialize(queue, job, config, working_key) ⇒ UnitOfWork

Returns a new instance of UnitOfWork.



223
224
225
226
227
228
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 223

def initialize(queue, job, config, working_key)
  @queue = queue
  @job = job
  @config = config
  @working_key = working_key
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



221
222
223
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 221

def config
  @config
end

#jobObject (readonly)

Returns the value of attribute job.



221
222
223
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 221

def job
  @job
end

#queueObject (readonly)

Returns the value of attribute queue.



221
222
223
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 221

def queue
  @queue
end

Instance Method Details

#acknowledgeObject

Called after successful job completion. Atomically removes from working list and unlocks via Lua.



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 236

def acknowledge
  parsed = safe_load_json(job)
  digest = parsed.is_a?(Hash) ? parsed[LOCK_DIGEST] : nil
  jid = parsed.is_a?(Hash) ? parsed[JID] : nil
  lock_type = parsed.is_a?(Hash) ? parsed["lock"] : nil

  if digest && jid
    call_script(
      :ack,
      [@working_key, "#{digest}:LOCKED", SidekiqUniqueJobs::DIGESTS],
      [job, jid, digest, lock_type.to_s],
    )
  else
    # Not a unique job — just remove from working list
    Sidekiq.redis { |conn| conn.call("LREM", @working_key, 1, job) }
  end
rescue StandardError => ex
  # Safety net: never let ack failure prevent Sidekiq from continuing
  log_warn("Acknowledge failed: #{ex.message}")
  Sidekiq.redis { |conn| conn.call("LREM", @working_key, 1, job) }
end

#queue_nameObject



230
231
232
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 230

def queue_name
  queue.delete_prefix("queue:")
end

#requeueObject

Called to return a job to the queue during shutdown. Preserves locks — the job is going back, not being abandoned.



260
261
262
263
264
265
266
267
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 260

def requeue
  Sidekiq.redis do |conn|
    conn.pipelined do |pipeline|
      pipeline.call("RPUSH", queue, job)
      pipeline.call("LREM", @working_key, 1, job)
    end
  end
end