Module: Sidekiq::LimitFetch::Queues

Extended by:
Queues
Included in:
Queues
Defined in:
lib/sidekiq/limit_fetch/queues.rb

Constant Summary collapse

THREAD_KEY =
:acquired_queues

Instance Method Summary collapse

Instance Method Details

#acquireObject

rubocop:enable Metrics/AbcSize rubocop:enable Metrics/CyclomaticComplexity rubocop:enable Metrics/MethodLength rubocop:enable Metrics/PerceivedComplexity



43
44
45
46
47
48
49
50
# File 'lib/sidekiq/limit_fetch/queues.rb', line 43

def acquire
  queues = saved
  queues ||= Sidekiq::LimitFetch.redis_retryable do
    selector.acquire(ordered_queues, namespace)
  end
  save queues
  queues.map { |it| "queue:#{it}" }
end

#add(queues) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/sidekiq/limit_fetch/queues.rb', line 72

def add(queues)
  return unless queues

  queues.each do |queue|
    next if @queues.include? queue

    if startup_queue?(queue)
      apply_process_limit_to_queue(queue)
      apply_limit_to_queue(queue)
    end

    @queues.push queue
  end
end

#dynamic?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/sidekiq/limit_fetch/queues.rb', line 60

def dynamic?
  @dynamic
end

#dynamic_excludeObject



68
69
70
# File 'lib/sidekiq/limit_fetch/queues.rb', line 68

def dynamic_exclude
  @dynamic_exclude
end

#handle(queues) ⇒ Object



99
100
101
102
# File 'lib/sidekiq/limit_fetch/queues.rb', line 99

def handle(queues)
  add(queues - @queues)
  remove(@queues - queues)
end

#namespaceObject

rubocop:enable Lint/NestedMethodDefinition



119
120
121
122
123
124
125
126
127
# File 'lib/sidekiq/limit_fetch/queues.rb', line 119

def namespace
  @namespace ||= Sidekiq.redis do |it|
    if it.respond_to?(:namespace) && it.namespace
      "#{it.namespace}:"
    else
      ''
    end
  end
end

#ordered_queuesObject



107
108
109
# File 'lib/sidekiq/limit_fetch/queues.rb', line 107

def ordered_queues
  @queues
end

#release_except(full_name) ⇒ Object



52
53
54
55
56
57
58
# File 'lib/sidekiq/limit_fetch/queues.rb', line 52

def release_except(full_name)
  queues = restore
  queues.delete full_name[/queue:(.*)/, 1] if full_name
  Sidekiq::LimitFetch.redis_retryable do
    selector.release queues, namespace
  end
end

#remove(queues) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
# File 'lib/sidekiq/limit_fetch/queues.rb', line 87

def remove(queues)
  return unless queues

  queues.each do |queue|
    next unless @queues.include? queue

    clear_limits_for_queue(queue)
    @queues.delete queue
    Sidekiq::Queue.delete_instance(queue)
  end
end

#start(capsule) ⇒ Object

rubocop:disable Metrics/AbcSize rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/PerceivedComplexity



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/sidekiq/limit_fetch/queues.rb', line 14

def start(capsule)
  config  = capsule.config
  @queues = capsule.queues.map { |queue| queue.is_a?(Array) ? queue.first : queue }.uniq

  @startup_queues = @queues.dup

  if config[:dynamic].is_a? Hash
    @dynamic         = true
    @dynamic_exclude = config[:dynamic][:exclude] || []
  else
    @dynamic = config[:dynamic]
    @dynamic_exclude = []
  end

  @limits         = config[:limits] || {}
  @process_limits = config[:process_limits] || {}
  @blocks         = config[:blocking] || []

  config[:strict] ? strict_order! : weighted_order!

  apply_process_limit_to_queues
  apply_limit_to_queues
  apply_blocks_to_queues
end

#startup_queue?(queue) ⇒ Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/sidekiq/limit_fetch/queues.rb', line 64

def startup_queue?(queue)
  @startup_queues.include?(queue)
end

#strict_order!Object

rubocop:disable Lint/NestedMethodDefinition



105
106
107
108
109
110
# File 'lib/sidekiq/limit_fetch/queues.rb', line 105

def strict_order!
  @queues.uniq!
  def ordered_queues
    @queues
  end
end

#weighted_order!Object



112
113
114
115
116
# File 'lib/sidekiq/limit_fetch/queues.rb', line 112

def weighted_order!
  def ordered_queues
    @queues.shuffle.uniq
  end
end