Class: CanvasSync::JobBatches::Pool

Inherits:
Object
  • Object
show all
Includes:
RedisModel
Defined in:
lib/canvas_sync/job_batches/pool.rb

Constant Summary collapse

POOL_REFILL =
RedisScript.new(Pathname.new(__FILE__) + "../pool_refill.lua")

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RedisModel

#persist_bid_attr, #read_bid_attr

Constructor Details

#initialize(pooolid = nil, **kwargs) ⇒ Pool

Returns a new instance of Pool.



16
17
18
19
20
21
22
23
24
# File 'lib/canvas_sync/job_batches/pool.rb', line 16

def initialize(pooolid = nil, **kwargs)
  if pooolid
    @existing = true
    @pid = pooolid
  else
    @pid = SecureRandom.urlsafe_base64(10)
    initialize_new(**kwargs)
  end
end

Instance Attribute Details

#pidObject (readonly)

Returns the value of attribute pid.



7
8
9
# File 'lib/canvas_sync/job_batches/pool.rb', line 7

def pid
  @pid
end

Class Method Details

.cleanup_redis_index!Object

Administrative/console method to cleanup expired pools from the WebUI



137
138
139
140
141
142
# File 'lib/canvas_sync/job_batches/pool.rb', line 137

def self.cleanup_redis_index!
  suffixes = ["", "-active", "-jobs"]
  r.zrangebyscore("pools", "0", Batch::BID_EXPIRE_TTL.seconds.ago.to_i).each do |pid|
    r.zrem("pools", pid) if Batch.cleanup_redis_index_for("POOLID-#{pid}", suffixes)
  end
end

.from_pid(pid) ⇒ Object



26
27
28
29
# File 'lib/canvas_sync/job_batches/pool.rb', line 26

def self.from_pid(pid)
  raise "PID must be given" unless pid.present?
  new(pid)
end

.job_checked_in(status, options) ⇒ Object



131
132
133
134
# File 'lib/canvas_sync/job_batches/pool.rb', line 131

def self.job_checked_in(status, options)
  pid = options['pool_id']
  from_pid(pid).job_checked_in(status, options)
end

Instance Method Details

#<<(job_desc) ⇒ Object



31
32
33
# File 'lib/canvas_sync/job_batches/pool.rb', line 31

def <<(job_desc)
  add_job(job_desc)
end

#active_count(r = redis) ⇒ Object



105
106
107
# File 'lib/canvas_sync/job_batches/pool.rb', line 105

def active_count(r = redis)
  r.hlen("#{redis_key}-active") + r.hincrby(redis_key, "_active_count", 0)
end

#active_jobs(r = redis) ⇒ Object



109
110
111
# File 'lib/canvas_sync/job_batches/pool.rb', line 109

def active_jobs(r = redis)
  r.hvals("#{redis_key}-active").map {|desc| JSON.parse(desc)[0] }
end

#add_job(job_desc) ⇒ Object



35
36
37
# File 'lib/canvas_sync/job_batches/pool.rb', line 35

def add_job(job_desc)
  add_jobs([job_desc])
end

#add_jobs(job_descs, skip_refill: false) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/canvas_sync/job_batches/pool.rb', line 39

def add_jobs(job_descs, skip_refill: false)
  job_descs.each do |job_desc|
    wrapper = Batch.new
    wrapper.description = "Pool Job Wrapper (PID: #{pid})"
    checkin_event = (on_failed_job == :wait) ? :success : :complete
    wrapper.on(checkin_event, "#{self.class.to_s}.job_checked_in", pool_id: pid)
    wrapper.jobs {}

    job_desc = job_desc.symbolize_keys
    job_desc = job_desc.merge!(
      job: job_desc[:job].to_s,
      pool_wrapper_batch: wrapper.bid,
    )

    push_job_to_pool(job_desc)
  end
  refill_allotment unless skip_refill
end

#cleanup_if_emptyObject



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/canvas_sync/job_batches/pool.rb', line 87

def cleanup_if_empty
  self.order

  activec, pactivec, pendingc, clean_when_empty, keep_open = redis.multi do |r|
    r.hlen("#{redis_key}-active")
    r.hget(redis_key, "_active_count")
    pending_count(r)
    r.hget(redis_key, 'clean_when_empty')
    r.hget(redis_key, 'keep_open')
  end

  return if keep_open == 'true' || clean_when_empty == 'false'

  if activec <= 0 && (pactivec.try(:to_i) || 0) <= 0 && pendingc <= 0
    cleanup_redis
  end
end

#cleanup_redisObject



76
77
78
79
80
81
82
83
84
85
# File 'lib/canvas_sync/job_batches/pool.rb', line 76

def cleanup_redis
  Batch.logger.debug {"Cleaning redis of pool #{pid}"}
  redis do |r|
    r.zrem("pools", pid)
    r.unlink(
      "#{redis_key}",
      "#{redis_key}-jobs",
    )
  end
end

#job_checked_in(status, options) ⇒ Object



126
127
128
129
# File 'lib/canvas_sync/job_batches/pool.rb', line 126

def job_checked_in(status, options)
  active_count = refill_allotment(status.bid)
  cleanup_if_empty unless active_count > 0
end

#keep_open!Object



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/canvas_sync/job_batches/pool.rb', line 58

def keep_open!
  if block_given?
    begin
      keep_open!
      yield
    ensure
      let_close!
    end
  else
    redis.hset(redis_key, 'keep_open', 'true')
  end
end

#let_close!Object



71
72
73
74
# File 'lib/canvas_sync/job_batches/pool.rb', line 71

def let_close!
  redis.hset(redis_key, 'keep_open', 'false')
  cleanup_if_empty
end

#pending_count(r = redis) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/canvas_sync/job_batches/pool.rb', line 113

def pending_count(r = redis)
  jobs_key = "#{redis_key}-jobs"
  order = self.order || 'fifo'
  case order.to_sym
  when :fifo, :lifo
    r.llen(jobs_key)
  when :random
    r.scard(jobs_key)
  when :priority
    r.zcard(jobs_key)
  end
end