Class: Sidekiq::Group::Collection

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/group/collection.rb

Constant Summary collapse

CID_EXPIRE_TTL =
3600 * 24 * 30
LOCK_TTL =
3600

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cid = nil) ⇒ Collection

Returns a new instance of Collection.



13
14
15
# File 'lib/sidekiq/group/collection.rb', line 13

def initialize(cid = nil)
  @cid = cid || SecureRandom.urlsafe_base64(16)
end

Instance Attribute Details

#callback_classObject

Returns the value of attribute callback_class.



10
11
12
# File 'lib/sidekiq/group/collection.rb', line 10

def callback_class
  @callback_class
end

#callback_optionsObject

Returns the value of attribute callback_options.



10
11
12
# File 'lib/sidekiq/group/collection.rb', line 10

def callback_options
  @callback_options
end

#cidObject (readonly) Also known as: group_id

Returns the value of attribute cid.



10
11
12
# File 'lib/sidekiq/group/collection.rb', line 10

def cid
  @cid
end

Instance Method Details

#add(jids) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/sidekiq/group/collection.rb', line 31

def add(jids)
  jids = Array(jids)

  Sidekiq.logger.info "Scheduling child job #{jids} for parent #{@cid}" if Sidekiq::Group.debug

  Sidekiq.redis do |r|
    r.multi do |pipeline|
      pipeline.sadd("#{@cid}-jids", jids)
      pipeline.expire("#{@cid}-jids", CID_EXPIRE_TTL)
      pipeline.hincrby(@cid, 'total', jids.size)
    end
  end
end

#initialize_total_valueObject



27
28
29
# File 'lib/sidekiq/group/collection.rb', line 27

def initialize_total_value
  persist('total', 0)
end

#processedObject



70
71
72
73
74
# File 'lib/sidekiq/group/collection.rb', line 70

def processed
  return unless spawned_all_jobs?

  total - pending
end

#spawned_jobs!Object



45
46
47
# File 'lib/sidekiq/group/collection.rb', line 45

def spawned_jobs!
  persist('spawned_jobs', cid)
end

#success(jid) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/sidekiq/group/collection.rb', line 49

def success(jid)
  remove_processed(jid)

  return unless processed_all_jobs?
  return if locked?

  callback_class, callback_options = callback_data
  options = JSON(callback_options)

  Sidekiq.logger.info "Scheduling callback job #{callback_class} with #{options}" if Sidekiq::Group.debug
  Sidekiq::Group::Worker.perform_async(callback_class, options)

  cleanup_redis
end

#totalObject



64
65
66
67
68
# File 'lib/sidekiq/group/collection.rb', line 64

def total
  return unless spawned_all_jobs?

  Sidekiq.redis { |r| r.hget(@cid, 'total').to_i }
end