Class: Wurk::Fetcher::Reliable

Inherits:
Wurk::Fetcher show all
Includes:
Component
Defined in:
lib/wurk/fetcher/reliable.rb

Overview

Default fetcher. Each public queue is paired with a per-process private list (‘queue:<name>|<host>|<pid>|<idx>`); a job is moved atomically from the public tail to the private head via LMOVE, and stays there until the Processor explicitly ACKs (LREM). SIGKILL between fetch and ack leaves the job in the private list, where the next boot of this process reclaims it via bulk_requeue.

Priority handling: iterate queues_cmd in order with non-blocking LMOVE, then fall back to a blocking BLMOVE on the first queue so an empty poll doesn’t spin Redis. BLMOVE has no multi-key form, so blocking on a single queue is the best Redis gives us. The block timeout defaults to TIMEOUT (2s) and is overridable per the Pro super_fetch §3.3 ‘config.fetch_poll_interval` knob.

Spec: docs/target/sidekiq-pro.md §3 (super_fetch, §3.3 poll interval), docs/target/sidekiq-free.md §15 (TIMEOUT=2).

Defined Under Namespace

Classes: UnitOfWork

Constant Summary collapse

TIMEOUT =

Default BLMOVE block timeout; overridable via config.fetch_poll_interval.

2

Constants included from Component

Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE

Instance Attribute Summary

Attributes included from Component

#config

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Component

#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(capsule) ⇒ Reliable

Returns a new instance of Reliable.



62
63
64
65
66
# File 'lib/wurk/fetcher/reliable.rb', line 62

def initialize(capsule)
  super()
  @config = capsule
  @done = false
end

Class Method Details

.private_queue_name(public_queue, index = 0) ⇒ Object

Class-level so UnitOfWork can compute the private list without carrying a back-reference to its parent fetcher. Index defaults to 0 — we run one fetcher per capsule today. Multi-processor topology (one private list per processor slot) is a future Manager concern.



57
58
59
60
# File 'lib/wurk/fetcher/reliable.rb', line 57

def self.private_queue_name(public_queue, index = 0)
  host = ENV['DYNO'] || Socket.gethostname
  "#{public_queue}|#{host}|#{::Process.pid}|#{index}"
end

Instance Method Details

#bulk_requeue(in_progress) ⇒ Object

Called on shutdown for jobs the Processor couldn’t finish in time. One pipelined RPUSH per public queue (head insert) so on next boot they’re picked again ahead of fresh enqueues.



84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/wurk/fetcher/reliable.rb', line 84

def bulk_requeue(in_progress)
  return if in_progress.nil? || in_progress.empty?

  grouped = in_progress.group_by(&:queue)
  config.redis do |conn|
    conn.pipelined do |pipe|
      grouped.each do |public_q, uows|
        pipe.call('RPUSH', public_q, *uows.map(&:job))
      end
    end
  end
end

#queues_cmdObject

Prefixed queue keys (‘queue:<name>`) in fetch order. Strict mode preserves declaration order. Random/weighted shuffle each call —shuffle yields weighted fairness; .uniq trims duplicates. Paused queues are filtered after shuffle so the membership test runs on the smallest possible set.



103
104
105
106
107
108
# File 'lib/wurk/fetcher/reliable.rb', line 103

def queues_cmd
  names = config.mode == :strict ? config.queues : config.queues.shuffle.uniq
  paused = paused_names
  names = names.reject { |q| paused.include?(q) } unless paused.empty?
  names.map { |q| "#{Keys::QUEUE_PREFIX}#{q}" }
end

#retrieve_workObject



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/wurk/fetcher/reliable.rb', line 68

def retrieve_work
  return nil if @done

  queues = queues_cmd
  return nil if queues.empty?

  queues.each do |public_q|
    uow = lmove(public_q)
    return uow if uow
  end
  blmove(queues.first)
end

#terminateObject



110
111
112
# File 'lib/wurk/fetcher/reliable.rb', line 110

def terminate
  @done = true
end