Class: Wurk::Fetcher::Reliable
- Inherits:
-
Wurk::Fetcher
- Object
- Wurk::Fetcher
- Wurk::Fetcher::Reliable
- Includes:
- Component
- Defined in:
- lib/wurk/fetcher/reliable.rb
Overview
Default fetcher. Each public queue is paired with a per-process private list (‘queue:<name>|<host>|<pid>|<idx>`); a job is moved atomically from the public tail to the private head via LMOVE, and stays there until the Processor explicitly ACKs (LREM). SIGKILL between fetch and ack leaves the job in the private list, where the next boot of this process reclaims it via bulk_requeue.
Priority handling: iterate queues_cmd in order with non-blocking LMOVE, then fall back to a 2s BLMOVE on the first queue so an empty poll doesn’t spin Redis. BLMOVE has no multi-key form, so blocking on a single queue is the best Redis gives us.
Spec: docs/target/sidekiq-pro.md §3 (super_fetch), docs/target/sidekiq-free.md §15 (TIMEOUT=2).
Defined Under Namespace
Classes: UnitOfWork
Constant Summary collapse
- TIMEOUT =
2
Constants included from Component
Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE
Instance Attribute Summary
Attributes included from Component
Class Method Summary collapse
-
.private_queue_name(public_queue, index = 0) ⇒ Object
Class-level so UnitOfWork can compute the private list without carrying a back-reference to its parent fetcher.
Instance Method Summary collapse
-
#bulk_requeue(in_progress) ⇒ Object
Called on shutdown for jobs the Processor couldn’t finish in time.
-
#initialize(capsule) ⇒ Reliable
constructor
A new instance of Reliable.
-
#queues_cmd ⇒ Object
Prefixed queue keys (‘queue:<name>`) in fetch order.
- #retrieve_work ⇒ Object
- #terminate ⇒ Object
Methods included from Component
#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog
Constructor Details
#initialize(capsule) ⇒ Reliable
Returns a new instance of Reliable.
59 60 61 62 63 |
# File 'lib/wurk/fetcher/reliable.rb', line 59 def initialize(capsule) super() @config = capsule @done = false end |
Class Method Details
.private_queue_name(public_queue, index = 0) ⇒ Object
Class-level so UnitOfWork can compute the private list without carrying a back-reference to its parent fetcher. Index defaults to 0 — we run one fetcher per capsule today. Multi-processor topology (one private list per processor slot) is a future Manager concern.
54 55 56 57 |
# File 'lib/wurk/fetcher/reliable.rb', line 54 def self.private_queue_name(public_queue, index = 0) host = ENV['DYNO'] || Socket.gethostname "#{public_queue}|#{host}|#{::Process.pid}|#{index}" end |
Instance Method Details
#bulk_requeue(in_progress) ⇒ Object
Called on shutdown for jobs the Processor couldn’t finish in time. One pipelined RPUSH per public queue (head insert) so on next boot they’re picked again ahead of fresh enqueues.
81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/wurk/fetcher/reliable.rb', line 81 def bulk_requeue(in_progress) return if in_progress.nil? || in_progress.empty? grouped = in_progress.group_by(&:queue) config.redis do |conn| conn.pipelined do |pipe| grouped.each do |public_q, uows| pipe.call('RPUSH', public_q, *uows.map(&:job)) end end end end |
#queues_cmd ⇒ Object
Prefixed queue keys (‘queue:<name>`) in fetch order. Strict mode preserves declaration order. Random/weighted shuffle each call —shuffle yields weighted fairness; .uniq trims duplicates. Paused queues are filtered after shuffle so the membership test runs on the smallest possible set.
100 101 102 103 104 105 |
# File 'lib/wurk/fetcher/reliable.rb', line 100 def queues_cmd names = config.mode == :strict ? config.queues : config.queues.shuffle.uniq paused = paused_names names = names.reject { |q| paused.include?(q) } unless paused.empty? names.map { |q| "#{Keys::QUEUE_PREFIX}#{q}" } end |
#retrieve_work ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/wurk/fetcher/reliable.rb', line 65 def retrieve_work return nil if @done queues = queues_cmd return nil if queues.empty? queues.each do |public_q| uow = lmove(public_q) return uow if uow end blmove(queues.first) end |
#terminate ⇒ Object
107 108 109 |
# File 'lib/wurk/fetcher/reliable.rb', line 107 def terminate @done = true end |