Class: Sidekiq::LimitFetch

Inherits:
BasicFetch
  • Object
show all
Defined in:
lib/sidekiq/limit_fetch.rb,
lib/sidekiq/limit_fetch/global.rb,
lib/sidekiq/limit_fetch/heartbeat.rb,
lib/sidekiq/limit_fetch/unit_of_work.rb,
lib/sidekiq/limit_fetch/global/selector.rb,
lib/sidekiq/limit_fetch/global/semaphore.rb

Overview

A Sidekiq fetch strategy that enforces global and per-process concurrency limits on queues. Replaces BasicFetch and uses a Lua script to atomically check limits and pop jobs from Redis.

Defined Under Namespace

Classes: Global, Heartbeat, Shutdown, UnitOfWork

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(capsule) ⇒ LimitFetch

Returns a new instance of LimitFetch.

Parameters:

  • capsule (Sidekiq::Capsule)


69
70
71
72
# File 'lib/sidekiq/limit_fetch.rb', line 69

def initialize(capsule)
  @capsule = capsule
  super
end

Instance Attribute Details

#capsuleObject (readonly)

Returns the value of attribute capsule.



66
67
68
# File 'lib/sidekiq/limit_fetch.rb', line 66

def capsule
  @capsule
end

Class Method Details

.configurationObject

Configurable values for Sidekiq::LimitFetch.



22
23
24
25
26
27
28
29
30
31
32
# File 'lib/sidekiq/limit_fetch.rb', line 22

def self.configuration
  @configuration ||= {
    # Workers will sleep for a random number in this range when no jobs are found.
    # This means when there is no queue backlog the maximum time to pick up a job will be within this range.
    # Queues with a backlog will continue to process jobs immediately.
    poll_range: Range.new(0.400, 0.500),

    # Time between heartbeats.
    heartbeat_period: 15,
  }
end

.setup(capsule) ⇒ Object

Parameters:

  • capsule (Sidekiq::Capsule)


42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/sidekiq/limit_fetch.rb', line 42

def self.setup(capsule)
  capsule_meta = Global.init_capsule(capsule)

  limits         = capsule[:limits] || {}
  process_limits = capsule[:process_limits] || {}

  capsule_meta.queue_set.each do |queue_name|
    queue = Global::QueueSemaphore.new(capsule, queue_name)
    queue_sym = queue_name.to_sym

    # Apply process limit
    if queue.process_limit.nil?
      queue.process_limit = process_limits[queue_sym]
    end

    # Apply global limit
    if queue.limit.nil?
      queue.limit = limits[queue_sym]
    end
  end

  LimitFetch::Heartbeat.new(capsule).start
end

Instance Method Details

#poll_intervalFloat

Returns:

  • (Float)


92
93
94
# File 'lib/sidekiq/limit_fetch.rb', line 92

def poll_interval
  Random.rand(self.class.configuration[:poll_range])
end

#retrieve_workUnitOfWork?

Lua:

* If limits permit us to take a job, try to take a job (RPOP) from the Sidekiq queue
* If a job is found:
  * Add the current process UUID to the internal "busy" list for that queue
  * Return Sidekiq job string to the client

Returns:



81
82
83
84
85
86
87
88
89
# File 'lib/sidekiq/limit_fetch.rb', line 81

def retrieve_work
  ordered_queues = queues_cmd # BasicFetch method handles randomization or strict ordering

  job_queue, job_str = selector.limit_fetch(ordered_queues)
  return UnitOfWork.new(job_queue, job_str, capsule) if job_str

  Kernel.sleep(poll_interval)
  nil
end

#selectorGlobal::Selector

Returns:



97
98
99
# File 'lib/sidekiq/limit_fetch.rb', line 97

def selector
  @selector ||= Global::Selector.new(capsule)
end