Class: SidekiqUniqueJobs::Fetch::Reliable::UnitOfWork
- Inherits:
-
Object
- Object
- SidekiqUniqueJobs::Fetch::Reliable::UnitOfWork
- Includes:
- JSON, Logging, Script::Caller
- Defined in:
- lib/sidekiq_unique_jobs/fetch/reliable.rb
Overview
UnitOfWork holds a fetched job with lock-aware acknowledge and requeue
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#job ⇒ Object
readonly
Returns the value of attribute job.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
-
#acknowledge ⇒ Object
Called after successful job completion.
-
#initialize(queue, job, config, working_key) ⇒ UnitOfWork
constructor
A new instance of UnitOfWork.
- #queue_name ⇒ Object
-
#requeue ⇒ Object
Called to return a job to the queue during shutdown.
Methods included from Logging
#build_message, included, #log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger, #logging_context, #with_configured_loggers_context, #with_logging_context
Methods included from JSON
dump_json, load_json, safe_load_json
Methods included from Script::Caller
call_script, debug_lua, do_call, extract_args, max_history, normalize_argv, now_f, redis_version
Constructor Details
#initialize(queue, job, config, working_key) ⇒ UnitOfWork
Returns a new instance of UnitOfWork.
223 224 225 226 227 228 |
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 223 def initialize(queue, job, config, working_key) @queue = queue @job = job @config = config @working_key = working_key end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
221 222 223 |
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 221 def config @config end |
#job ⇒ Object (readonly)
Returns the value of attribute job.
221 222 223 |
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 221 def job @job end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
221 222 223 |
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 221 def queue @queue end |
Instance Method Details
#acknowledge ⇒ Object
Called after successful job completion. Atomically removes from working list and unlocks via Lua.
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 236 def acknowledge parsed = safe_load_json(job) digest = parsed.is_a?(Hash) ? parsed[LOCK_DIGEST] : nil jid = parsed.is_a?(Hash) ? parsed[JID] : nil lock_type = parsed.is_a?(Hash) ? parsed["lock"] : nil if digest && jid call_script( :ack, [@working_key, "#{digest}:LOCKED", SidekiqUniqueJobs::DIGESTS], [job, jid, digest, lock_type.to_s], ) else # Not a unique job — just remove from working list Sidekiq.redis { |conn| conn.call("LREM", @working_key, 1, job) } end rescue StandardError => ex # Safety net: never let ack failure prevent Sidekiq from continuing log_warn("Acknowledge failed: #{ex.}") Sidekiq.redis { |conn| conn.call("LREM", @working_key, 1, job) } end |
#queue_name ⇒ Object
230 231 232 |
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 230 def queue_name queue.delete_prefix("queue:") end |
#requeue ⇒ Object
Called to return a job to the queue during shutdown. Preserves locks — the job is going back, not being abandoned.
260 261 262 263 264 265 266 267 |
# File 'lib/sidekiq_unique_jobs/fetch/reliable.rb', line 260 def requeue Sidekiq.redis do |conn| conn.pipelined do |pipeline| pipeline.call("RPUSH", queue, job) pipeline.call("LREM", @working_key, 1, job) end end end |