Class: Whoosh::Jobs::RedisBackend

Inherits:
Object
  • Object
show all
Defined in:
lib/whoosh/jobs/redis_backend.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url:, prefix: "whoosh:jobs") ⇒ RedisBackend

Returns a new instance of RedisBackend.



20
21
22
23
24
25
26
# File 'lib/whoosh/jobs/redis_backend.rb', line 20

def initialize(url:, prefix: "whoosh:jobs")
  unless self.class.available?
    raise Errors::DependencyError, "Jobs Redis backend requires the 'redis' gem"
  end
  @redis = Redis.new(url: url)
  @prefix = prefix
end

Class Method Details

.available?Boolean

Returns:

  • (Boolean)


8
9
10
11
12
13
14
15
16
17
18
# File 'lib/whoosh/jobs/redis_backend.rb', line 8

def self.available?
  if @redis_available.nil?
    @redis_available = begin
      require "redis"
      true
    rescue LoadError
      false
    end
  end
  @redis_available
end

Instance Method Details

#find(id) ⇒ Object



64
65
66
67
68
69
# File 'lib/whoosh/jobs/redis_backend.rb', line 64

def find(id)
  raw = @redis.get("#{@prefix}:record:#{id}")
  return nil unless raw
  data = Serialization::Json.decode(raw)
  data.transform_keys(&:to_sym)
end

#pending_countObject



75
76
77
78
79
80
81
82
83
# File 'lib/whoosh/jobs/redis_backend.rb', line 75

def pending_count
  count = 0
  @redis.keys("#{@prefix}:queue:*").each do |key|
    count += @redis.llen(key)
  end
  count
rescue => e
  0
end

#pop(timeout: 5, queues: ["default"]) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/whoosh/jobs/redis_backend.rb', line 38

def pop(timeout: 5, queues: ["default"])
  # First, promote scheduled jobs
  promote_scheduled

  # Try each queue in priority order
  queues.each do |queue|
    result = @redis.rpop("#{@prefix}:queue:#{queue}")
    if result
      return Serialization::Json.decode(result).transform_keys(&:to_sym)
    end
  end

  # Block-wait on default queue
  result = @redis.brpop("#{@prefix}:queue:#{queues.first}", timeout: timeout)
  if result
    Serialization::Json.decode(result[1]).transform_keys(&:to_sym)
  end
rescue => e
  nil
end

#push(job_data) ⇒ Object



28
29
30
31
32
33
34
35
36
# File 'lib/whoosh/jobs/redis_backend.rb', line 28

def push(job_data)
  serialized = Serialization::Json.encode(job_data)
  if job_data[:run_at] && job_data[:run_at] > Time.now.to_f
    # Scheduled: use sorted set with run_at as score
    @redis.zadd("#{@prefix}:scheduled", job_data[:run_at], serialized)
  else
    @redis.lpush("#{@prefix}:queue:#{job_data[:queue] || "default"}", serialized)
  end
end

#save(record) ⇒ Object



59
60
61
62
# File 'lib/whoosh/jobs/redis_backend.rb', line 59

def save(record)
  serialized = Serialization::Json.encode(record)
  @redis.set("#{@prefix}:record:#{record[:id]}", serialized, ex: 86400) # 24h TTL
end

#scheduled_countObject



85
86
87
88
89
# File 'lib/whoosh/jobs/redis_backend.rb', line 85

def scheduled_count
  @redis.zcard("#{@prefix}:scheduled")
rescue => e
  0
end

#shutdownObject



91
92
93
94
95
# File 'lib/whoosh/jobs/redis_backend.rb', line 91

def shutdown
  @redis.close
rescue => e
  # Already closed
end

#sizeObject



71
72
73
# File 'lib/whoosh/jobs/redis_backend.rb', line 71

def size
  pending_count + scheduled_count
end