Class: Rubee::PubSub::Redis

Inherits:
Container show all
Includes:
Singleton
Defined in:
lib/rubee/pubsub/redis.rb

Constant Summary collapse

DEFAULT_POOL_SIZE =
5
DEFAULT_TIMEOUT =
5

Instance Method Summary collapse

Constructor Details

#initializeRedis

Returns a new instance of Redis.



14
15
16
17
18
19
20
21
22
23
24
# File 'lib/rubee/pubsub/redis.rb', line 14

def initialize
  redis_url = Rubee::Configuration.get_redis_url
  @pool = ConnectionPool.new(size: DEFAULT_POOL_SIZE, timeout: DEFAULT_TIMEOUT) do
    if redis_url&.empty?
      ::Redis.new
    else
      ::Redis.new(url: redis_url)
    end
  end
  @mutex = Mutex.new
end

Instance Method Details

#pub(channel, args = {}, &block) ⇒ Object

Example: pub(“ok”, message: “hello”)



27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/rubee/pubsub/redis.rb', line 27

def pub(channel, args = {}, &block)
  keys = with_redis { |r| r.scan_each(match: "#{channel}:*").to_a }
  return false if keys.empty?

  values = with_redis { |r| r.mget(*keys) }

  iterable = values.each_with_index.each_with_object({}) do |(val, i), hash|
    key = keys[i]
    hash[key] = val
  end

  clazzes = retrieve_klasses(iterable)
  fan_out(clazzes, args, &block)
end

#sub(channel, klass_name, *args, &block) ⇒ Object

Example: sub(“ok”, “User”, [“123”])



43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/rubee/pubsub/redis.rb', line 43

def sub(channel, klass_name, *args, &block)
  @mutex.synchronize do
    id = args.first
    id_string = id ? ":#{id}" : ""
    key = "#{channel}:#{klass_name}#{id_string}"
    existing = with_redis { |r| r.get(key) }
    io = args.last.respond_to?(:call) ? args.pop : nil

    with_redis { |r| r.set(key, args.join(",")) } unless existing
    block&.call(key, io: io)
  end
  true
end

#unsub(channel, klass_name, *args, &block) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/rubee/pubsub/redis.rb', line 57

def unsub(channel, klass_name, *args, &block)
  @mutex.synchronize do
    id = args.first
    id_string = id ? ":#{id}" : ""
    key = "#{channel}:#{klass_name}#{id_string}"
    value = with_redis { |r| r.get(key) }
    return false unless value

    io = args.pop if args.last.respond_to?(:call)
    with_redis { |r| r.del(key) }
    block&.call(key, io: io)
  end
  true
end