Class: Wurk::Fetcher::Reliable

Inherits:
Wurk::Fetcher show all
Includes:
Component
Defined in:
lib/wurk/fetcher/reliable.rb

Overview

Default fetcher. Each public queue is paired with a per-process private list (‘queue:<name>|<host>|<pid>|<idx>`); a job is moved atomically from the public tail to the private head via LMOVE, and stays there until the Processor explicitly ACKs (LREM). SIGKILL between fetch and ack leaves the job in the private list, where the next boot of this process reclaims it via bulk_requeue.

Priority handling: iterate queues_cmd in order with non-blocking LMOVE, then fall back to a 2s BLMOVE on the first queue so an empty poll doesn’t spin Redis. BLMOVE has no multi-key form, so blocking on a single queue is the best Redis gives us.

Spec: docs/target/sidekiq-pro.md §3 (super_fetch), docs/target/sidekiq-free.md §15 (TIMEOUT=2).

Defined Under Namespace

Classes: UnitOfWork

Constant Summary collapse

TIMEOUT =
2

Constants included from Component

Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE

Instance Attribute Summary

Attributes included from Component

#config

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Component

#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(capsule) ⇒ Reliable

Returns a new instance of Reliable.



59
60
61
62
63
# File 'lib/wurk/fetcher/reliable.rb', line 59

def initialize(capsule)
  super()
  @config = capsule
  @done = false
end

Class Method Details

.private_queue_name(public_queue, index = 0) ⇒ Object

Class-level so UnitOfWork can compute the private list without carrying a back-reference to its parent fetcher. Index defaults to 0 — we run one fetcher per capsule today. Multi-processor topology (one private list per processor slot) is a future Manager concern.



54
55
56
57
# File 'lib/wurk/fetcher/reliable.rb', line 54

def self.private_queue_name(public_queue, index = 0)
  host = ENV['DYNO'] || Socket.gethostname
  "#{public_queue}|#{host}|#{::Process.pid}|#{index}"
end

Instance Method Details

#bulk_requeue(in_progress) ⇒ Object

Called on shutdown for jobs the Processor couldn’t finish in time. One pipelined RPUSH per public queue (head insert) so on next boot they’re picked again ahead of fresh enqueues.



81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/wurk/fetcher/reliable.rb', line 81

def bulk_requeue(in_progress)
  return if in_progress.nil? || in_progress.empty?

  grouped = in_progress.group_by(&:queue)
  config.redis do |conn|
    conn.pipelined do |pipe|
      grouped.each do |public_q, uows|
        pipe.call('RPUSH', public_q, *uows.map(&:job))
      end
    end
  end
end

#queues_cmdObject

Prefixed queue keys (‘queue:<name>`) in fetch order. Strict mode preserves declaration order. Random/weighted shuffle each call —shuffle yields weighted fairness; .uniq trims duplicates. Paused queues are filtered after shuffle so the membership test runs on the smallest possible set.



100
101
102
103
104
105
# File 'lib/wurk/fetcher/reliable.rb', line 100

def queues_cmd
  names = config.mode == :strict ? config.queues : config.queues.shuffle.uniq
  paused = paused_names
  names = names.reject { |q| paused.include?(q) } unless paused.empty?
  names.map { |q| "#{Keys::QUEUE_PREFIX}#{q}" }
end

#retrieve_workObject



65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/wurk/fetcher/reliable.rb', line 65

def retrieve_work
  return nil if @done

  queues = queues_cmd
  return nil if queues.empty?

  queues.each do |public_q|
    uow = lmove(public_q)
    return uow if uow
  end
  blmove(queues.first)
end

#terminateObject



107
108
109
# File 'lib/wurk/fetcher/reliable.rb', line 107

def terminate
  @done = true
end