Class: Joblin::Batching::Pool
- Inherits:
-
Object
- Object
- Joblin::Batching::Pool
- Includes:
- RediConn::RedisModel
- Defined in:
- lib/joblin/batching/pool.rb
Constant Summary collapse
- POOL_REFILL =
RediConn::RedisScript.new(Pathname.new(__FILE__) + "../pool_refill.lua")
Instance Attribute Summary collapse
-
#pid ⇒ Object
readonly
Returns the value of attribute pid.
Class Method Summary collapse
-
.cleanup_redis_index! ⇒ Object
Administrative/console method to cleanup expired pools from the WebUI.
- .from_pid(pid) ⇒ Object
- .job_checked_in(status, options) ⇒ Object
- .redis(&blk) ⇒ Object
Instance Method Summary collapse
- #<<(job_desc) ⇒ Object
- #active_count(r = redis) ⇒ Object
- #active_jobs(r = redis) ⇒ Object
- #add_job(job_desc) ⇒ Object
- #add_jobs(job_descs, skip_refill: false) ⇒ Object
- #cleanup_if_empty ⇒ Object
- #cleanup_redis ⇒ Object
-
#initialize(pooolid = nil, **kwargs) ⇒ Pool
constructor
A new instance of Pool.
- #job_checked_in(status, options) ⇒ Object
- #keep_open!(token = SecureRandom.urlsafe_base64(10)) ⇒ Object
- #let_close!(token = :unset) ⇒ Object
- #pending_count(r = redis) ⇒ Object
Constructor Details
#initialize(pooolid = nil, **kwargs) ⇒ Pool
Returns a new instance of Pool.
16 17 18 19 20 21 22 23 24 |
# File 'lib/joblin/batching/pool.rb', line 16 def initialize(pooolid = nil, **kwargs) if pooolid @existing = true @pid = pooolid else @pid = SecureRandom.urlsafe_base64(10) initialize_new(**kwargs) end end |
Instance Attribute Details
#pid ⇒ Object (readonly)
Returns the value of attribute pid.
7 8 9 |
# File 'lib/joblin/batching/pool.rb', line 7 def pid @pid end |
Class Method Details
.cleanup_redis_index! ⇒ Object
Administrative/console method to cleanup expired pools from the WebUI
148 149 150 151 152 153 |
# File 'lib/joblin/batching/pool.rb', line 148 def self.cleanup_redis_index! suffixes = ["", "-active", "-jobs"] r.zrangebyscore("pools", "0", Batch::BID_EXPIRE_TTL.seconds.ago.to_i).each do |pid| r.zrem("pools", pid) if Batch.cleanup_redis_index_for("POOLID-#{pid}", suffixes) end end |
.from_pid(pid) ⇒ Object
26 27 28 29 |
# File 'lib/joblin/batching/pool.rb', line 26 def self.from_pid(pid) raise "PID must be given" unless pid.present? new(pid) end |
.job_checked_in(status, options) ⇒ Object
142 143 144 145 |
# File 'lib/joblin/batching/pool.rb', line 142 def self.job_checked_in(status, ) pid = ['pool_id'] from_pid(pid).job_checked_in(status, ) end |
Instance Method Details
#<<(job_desc) ⇒ Object
31 32 33 |
# File 'lib/joblin/batching/pool.rb', line 31 def <<(job_desc) add_job(job_desc) end |
#active_count(r = redis) ⇒ Object
116 117 118 |
# File 'lib/joblin/batching/pool.rb', line 116 def active_count(r = redis) r.hlen("#{redis_key}-active") + r.hincrby(redis_key, "_active_count", 0) end |
#active_jobs(r = redis) ⇒ Object
120 121 122 |
# File 'lib/joblin/batching/pool.rb', line 120 def active_jobs(r = redis) r.hvals("#{redis_key}-active").map {|desc| JSON.parse(desc)[0] } end |
#add_job(job_desc) ⇒ Object
35 36 37 |
# File 'lib/joblin/batching/pool.rb', line 35 def add_job(job_desc) add_jobs([job_desc]) end |
#add_jobs(job_descs, skip_refill: false) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/joblin/batching/pool.rb', line 39 def add_jobs(job_descs, skip_refill: false) job_descs.each do |job_desc| wrapper = Batch.new wrapper.description = "Pool Job Wrapper (PID: #{pid})" checkin_event = (on_failed_job == :wait) ? :success : :complete wrapper.on(checkin_event, "#{self.class.to_s}.job_checked_in", pool_id: pid) wrapper.placeholder! job_desc = job_desc.symbolize_keys job_desc = job_desc.merge!( job: job_desc[:job].to_s, pool_wrapper_batch: wrapper.bid, ) push_job_to_pool(job_desc) end refill_allotment unless skip_refill end |
#cleanup_if_empty ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/joblin/batching/pool.rb', line 97 def cleanup_if_empty self.order activec, pactivec, pendingc, clean_when_empty, keep_open, holds = redis.multi do |r| r.hlen("#{redis_key}-active") r.hget(redis_key, "_active_count") pending_count(r) r.hget(redis_key, 'clean_when_empty') r.hget(redis_key, 'keep_open') r.scard("#{redis_key}-holds") end return if keep_open == 'true' || clean_when_empty == 'false' || (holds && holds > 0) if activec <= 0 && (pactivec.try(:to_i) || 0) <= 0 && pendingc <= 0 cleanup_redis end end |
#cleanup_redis ⇒ Object
86 87 88 89 90 91 92 93 94 95 |
# File 'lib/joblin/batching/pool.rb', line 86 def cleanup_redis Batch.logger.debug {"Cleaning redis of pool #{pid}"} redis do |r| r.zrem("pools", pid) r.unlink( "#{redis_key}", "#{redis_key}-jobs", ) end end |
#job_checked_in(status, options) ⇒ Object
137 138 139 140 |
# File 'lib/joblin/batching/pool.rb', line 137 def job_checked_in(status, ) active_count = refill_allotment(status.bid) cleanup_if_empty unless active_count > 0 end |
#keep_open!(token = SecureRandom.urlsafe_base64(10)) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/joblin/batching/pool.rb', line 58 def keep_open!(token = SecureRandom.urlsafe_base64(10)) if block_given? begin token = keep_open!(token) yield ensure let_close!(token) end else redis.multi do |r| r.sadd("#{redis_key}-holds", token) r.expire("#{redis_key}-holds", Batch::BID_EXPIRE_TTL) end token end end |
#let_close!(token = :unset) ⇒ Object
75 76 77 78 79 80 81 82 83 84 |
# File 'lib/joblin/batching/pool.rb', line 75 def let_close!(token = :unset) if token == :unset # Legacy redis.del("#{redis_key}-holds") redis.hset(redis_key, 'keep_open', 'false') else redis.srem("#{redis_key}-holds", token) end cleanup_if_empty end |
#pending_count(r = redis) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/joblin/batching/pool.rb', line 124 def pending_count(r = redis) jobs_key = "#{redis_key}-jobs" order = self.order || 'fifo' case order.to_sym when :fifo, :lifo r.llen(jobs_key) when :random r.scard(jobs_key) when :priority r.zcard(jobs_key) end end |