Class: Wurk::Fetcher::Reliable
- Inherits:
-
Wurk::Fetcher
- Object
- Wurk::Fetcher
- Wurk::Fetcher::Reliable
- Includes:
- Component
- Defined in:
- lib/wurk/fetcher/reliable.rb
Overview
Default fetcher. Each public queue is paired with a per-process private list (‘queue:<name>|<host>|<pid>|<idx>`); a job is moved atomically from the public tail to the private head via LMOVE, and stays there until the Processor explicitly ACKs (LREM). SIGKILL between fetch and ack leaves the job in the private list, where the next boot of this process reclaims it via bulk_requeue.
Priority handling: iterate queues_cmd in order with non-blocking LMOVE, then fall back to a blocking BLMOVE on the first queue so an empty poll doesn’t spin Redis. BLMOVE has no multi-key form, so blocking on a single queue is the best Redis gives us. The block timeout defaults to TIMEOUT (2s) and is overridable per the Pro super_fetch §3.3 ‘config.fetch_poll_interval` knob.
Spec: docs/target/sidekiq-pro.md §3 (super_fetch, §3.3 poll interval), docs/target/sidekiq-free.md §15 (TIMEOUT=2).
Defined Under Namespace
Classes: UnitOfWork
Constant Summary collapse
- TIMEOUT =
Default BLMOVE block timeout; overridable via config.fetch_poll_interval.
2
Constants included from Component
Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE
Instance Attribute Summary
Attributes included from Component
Class Method Summary collapse
-
.private_queue_name(public_queue, index = 0) ⇒ Object
Class-level so UnitOfWork can compute the private list without carrying a back-reference to its parent fetcher.
Instance Method Summary collapse
-
#bulk_requeue(in_progress) ⇒ Object
Called on shutdown for jobs the Processor couldn’t finish in time.
-
#initialize(capsule) ⇒ Reliable
constructor
A new instance of Reliable.
-
#queues_cmd ⇒ Object
Prefixed queue keys (‘queue:<name>`) in fetch order.
- #retrieve_work ⇒ Object
- #terminate ⇒ Object
Methods included from Component
#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog
Constructor Details
#initialize(capsule) ⇒ Reliable
Returns a new instance of Reliable.
62 63 64 65 66 |
# File 'lib/wurk/fetcher/reliable.rb', line 62 def initialize(capsule) super() @config = capsule @done = false end |
Class Method Details
.private_queue_name(public_queue, index = 0) ⇒ Object
Class-level so UnitOfWork can compute the private list without carrying a back-reference to its parent fetcher. Index defaults to 0 — we run one fetcher per capsule today. Multi-processor topology (one private list per processor slot) is a future Manager concern.
57 58 59 60 |
# File 'lib/wurk/fetcher/reliable.rb', line 57 def self.private_queue_name(public_queue, index = 0) host = ENV['DYNO'] || Socket.gethostname "#{public_queue}|#{host}|#{::Process.pid}|#{index}" end |
Instance Method Details
#bulk_requeue(in_progress) ⇒ Object
Called on shutdown for jobs the Processor couldn’t finish in time. One pipelined RPUSH per public queue (head insert) so on next boot they’re picked again ahead of fresh enqueues.
84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/wurk/fetcher/reliable.rb', line 84 def bulk_requeue(in_progress) return if in_progress.nil? || in_progress.empty? grouped = in_progress.group_by(&:queue) config.redis do |conn| conn.pipelined do |pipe| grouped.each do |public_q, uows| pipe.call('RPUSH', public_q, *uows.map(&:job)) end end end end |
#queues_cmd ⇒ Object
Prefixed queue keys (‘queue:<name>`) in fetch order. Strict mode preserves declaration order. Random/weighted shuffle each call —shuffle yields weighted fairness; .uniq trims duplicates. Paused queues are filtered after shuffle so the membership test runs on the smallest possible set.
103 104 105 106 107 108 |
# File 'lib/wurk/fetcher/reliable.rb', line 103 def queues_cmd names = config.mode == :strict ? config.queues : config.queues.shuffle.uniq paused = paused_names names = names.reject { |q| paused.include?(q) } unless paused.empty? names.map { |q| "#{Keys::QUEUE_PREFIX}#{q}" } end |
#retrieve_work ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/wurk/fetcher/reliable.rb', line 68 def retrieve_work return nil if @done queues = queues_cmd return nil if queues.empty? queues.each do |public_q| uow = lmove(public_q) return uow if uow end blmove(queues.first) end |
#terminate ⇒ Object
110 111 112 |
# File 'lib/wurk/fetcher/reliable.rb', line 110 def terminate @done = true end |