Module: Sidekiq::LimitFetch::Queues
Constant Summary collapse
- THREAD_KEY =
:acquired_queues
Instance Method Summary collapse
-
#acquire ⇒ Object
rubocop:enable Metrics/AbcSize rubocop:enable Metrics/CyclomaticComplexity rubocop:enable Metrics/MethodLength rubocop:enable Metrics/PerceivedComplexity.
- #add(queues) ⇒ Object
- #dynamic? ⇒ Boolean
- #dynamic_exclude ⇒ Object
- #handle(queues) ⇒ Object
-
#namespace ⇒ Object
rubocop:enable Lint/NestedMethodDefinition.
- #ordered_queues ⇒ Object
- #release_except(full_name) ⇒ Object
- #remove(queues) ⇒ Object
-
#start(capsule) ⇒ Object
rubocop:disable Metrics/AbcSize rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/PerceivedComplexity.
- #startup_queue?(queue) ⇒ Boolean
-
#strict_order! ⇒ Object
rubocop:disable Lint/NestedMethodDefinition.
- #weighted_order! ⇒ Object
Instance Method Details
#acquire ⇒ Object
rubocop:enable Metrics/AbcSize rubocop:enable Metrics/CyclomaticComplexity rubocop:enable Metrics/MethodLength rubocop:enable Metrics/PerceivedComplexity
43 44 45 46 47 48 49 50 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 43 def acquire queues = saved queues ||= Sidekiq::LimitFetch.redis_retryable do selector.acquire(ordered_queues, namespace) end save queues queues.map { |it| "queue:#{it}" } end |
#add(queues) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 72 def add(queues) return unless queues queues.each do |queue| next if @queues.include? queue if startup_queue?(queue) apply_process_limit_to_queue(queue) apply_limit_to_queue(queue) end @queues.push queue end end |
#dynamic? ⇒ Boolean
60 61 62 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 60 def dynamic? @dynamic end |
#dynamic_exclude ⇒ Object
68 69 70 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 68 def dynamic_exclude @dynamic_exclude end |
#handle(queues) ⇒ Object
99 100 101 102 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 99 def handle(queues) add(queues - @queues) remove(@queues - queues) end |
#namespace ⇒ Object
rubocop:enable Lint/NestedMethodDefinition
119 120 121 122 123 124 125 126 127 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 119 def namespace @namespace ||= Sidekiq.redis do |it| if it.respond_to?(:namespace) && it.namespace "#{it.namespace}:" else '' end end end |
#ordered_queues ⇒ Object
107 108 109 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 107 def ordered_queues @queues end |
#release_except(full_name) ⇒ Object
52 53 54 55 56 57 58 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 52 def release_except(full_name) queues = restore queues.delete full_name[/queue:(.*)/, 1] if full_name Sidekiq::LimitFetch.redis_retryable do selector.release queues, namespace end end |
#remove(queues) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 87 def remove(queues) return unless queues queues.each do |queue| next unless @queues.include? queue clear_limits_for_queue(queue) @queues.delete queue Sidekiq::Queue.delete_instance(queue) end end |
#start(capsule) ⇒ Object
rubocop:disable Metrics/AbcSize rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/PerceivedComplexity
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 14 def start(capsule) config = capsule.config @queues = capsule.queues.map { |queue| queue.is_a?(Array) ? queue.first : queue }.uniq @startup_queues = @queues.dup if config[:dynamic].is_a? Hash @dynamic = true @dynamic_exclude = config[:dynamic][:exclude] || [] else @dynamic = config[:dynamic] @dynamic_exclude = [] end @limits = config[:limits] || {} @process_limits = config[:process_limits] || {} @blocks = config[:blocking] || [] config[:strict] ? strict_order! : weighted_order! apply_process_limit_to_queues apply_limit_to_queues apply_blocks_to_queues end |
#startup_queue?(queue) ⇒ Boolean
64 65 66 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 64 def startup_queue?(queue) @startup_queues.include?(queue) end |
#strict_order! ⇒ Object
rubocop:disable Lint/NestedMethodDefinition
105 106 107 108 109 110 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 105 def strict_order! @queues.uniq! def ordered_queues @queues end end |
#weighted_order! ⇒ Object
112 113 114 115 116 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 112 def weighted_order! def ordered_queues @queues.shuffle.uniq end end |