Class: Sidekiq::Capsule

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Component
Defined in:
lib/sidekiq/capsule.rb

Overview

A Sidekiq::Capsule is the set of resources necessary to process one or more queues with a given concurrency. One “default” Capsule is started but the user may declare additional Capsules in their initializer.

This capsule will pull jobs from the “single” queue and process the jobs with one thread, meaning the jobs will be processed serially.

Sidekiq.configure_server do |config|
  config.capsule("single-threaded") do |cap|
    cap.concurrency = 1
    cap.queues = %w(single)
  end
end

Instance Attribute Summary collapse

Attributes included from Component

#config

Instance Method Summary collapse

Methods included from Component

#default_tag, #fire_event, #handle_exception, #hostname, #identity, #inspect, #mono_ms, #process_nonce, #real_ms, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(name, config) ⇒ Capsule

Returns a new instance of Capsule.



32
33
34
35
36
37
38
39
# File 'lib/sidekiq/capsule.rb', line 32

def initialize(name, config)
  @name = name
  @config = config
  @queues = ["default"]
  @weights = {"default" => 0}
  @concurrency = config[:concurrency]
  @mode = :strict
end

Instance Attribute Details

#concurrencyObject

Returns the value of attribute concurrency.



26
27
28
# File 'lib/sidekiq/capsule.rb', line 26

def concurrency
  @concurrency
end

#modeObject (readonly)

Returns the value of attribute mode.



27
28
29
# File 'lib/sidekiq/capsule.rb', line 27

def mode
  @mode
end

#nameObject (readonly)

Returns the value of attribute name.



24
25
26
# File 'lib/sidekiq/capsule.rb', line 24

def name
  @name
end

#queuesObject

Returns the value of attribute queues.



25
26
27
# File 'lib/sidekiq/capsule.rb', line 25

def queues
  @queues
end

#weightsObject (readonly)

Returns the value of attribute weights.



28
29
30
# File 'lib/sidekiq/capsule.rb', line 28

def weights
  @weights
end

Instance Method Details

#client_middleware {|@client_chain| ... } ⇒ Object

Allow the middleware to be different per-capsule. Avoid if possible and add middleware globally so all capsules share the same chains. Easier to debug that way.

Yields:

  • (@client_chain)


83
84
85
86
87
# File 'lib/sidekiq/capsule.rb', line 83

def client_middleware
  @client_chain ||= config.client_middleware.copy_for(self)
  yield @client_chain if block_given?
  @client_chain
end

#fetcherObject



45
46
47
48
49
50
51
# File 'lib/sidekiq/capsule.rb', line 45

def fetcher
  @fetcher ||= begin
    instance = (config[:fetch_class] || Sidekiq::BasicFetch).new(self)
    instance.setup(config[:fetch_setup]) if instance.respond_to?(:setup)
    instance
  end
end

#local_redis_poolObject



99
100
101
102
103
# File 'lib/sidekiq/capsule.rb', line 99

def local_redis_pool
  # connection pool is lazy, it will not create connections unless you actually need them
  # so don't be skimpy!
  @redis ||= config.new_redis_pool(@concurrency, name)
end

#loggerObject



131
132
133
# File 'lib/sidekiq/capsule.rb', line 131

def logger
  config.logger
end

#lookup(name) ⇒ Object



127
128
129
# File 'lib/sidekiq/capsule.rb', line 127

def lookup(name)
  config.lookup(name)
end

#redisObject

Raises:

  • (ArgumentError)


105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/sidekiq/capsule.rb', line 105

def redis
  raise ArgumentError, "requires a block" unless block_given?
  redis_pool.with do |conn|
    retryable = true
    begin
      yield conn
    rescue RedisClientAdapter::BaseError => ex
      # 2550 Failover can cause the server to become a replica, need
      # to disconnect and reopen the socket to get back to the primary.
      # 4495 Use the same logic if we have a "Not enough replicas" error from the primary
      # 4985 Use the same logic when a blocking command is force-unblocked
      # The same retry logic is also used in client.rb
      if retryable && ex.message =~ /READONLY|NOREPLICAS|UNBLOCKED/
        conn.close
        retryable = false
        retry
      end
      raise
    end
  end
end

#redis_poolObject



95
96
97
# File 'lib/sidekiq/capsule.rb', line 95

def redis_pool
  Thread.current[:sidekiq_redis_pool] || local_redis_pool
end

#server_middleware {|@server_chain| ... } ⇒ Object

Yields:

  • (@server_chain)


89
90
91
92
93
# File 'lib/sidekiq/capsule.rb', line 89

def server_middleware
  @server_chain ||= config.server_middleware.copy_for(self)
  yield @server_chain if block_given?
  @server_chain
end

#stopObject



53
54
# File 'lib/sidekiq/capsule.rb', line 53

def stop
end

#to_hObject



41
42
43
# File 'lib/sidekiq/capsule.rb', line 41

def to_h
  {concurrency: concurrency, mode: mode, weights: weights}
end