Class: Rubee::PubSub::Redis
- Includes:
- Singleton
- Defined in:
- lib/rubee/pubsub/redis.rb
Constant Summary collapse
- DEFAULT_POOL_SIZE =
5- DEFAULT_TIMEOUT =
5
Instance Method Summary collapse
-
#initialize ⇒ Redis
constructor
A new instance of Redis.
-
#pub(channel, args = {}, &block) ⇒ Object
Example: pub(“ok”, message: “hello”).
-
#sub(channel, klass_name, *args, &block) ⇒ Object
Example: sub(“ok”, “User”, [“123”]).
- #unsub(channel, klass_name, *args, &block) ⇒ Object
Constructor Details
#initialize ⇒ Redis
Returns a new instance of Redis.
14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/rubee/pubsub/redis.rb', line 14 def initialize redis_url = Rubee::Configuration.get_redis_url @pool = ConnectionPool.new(size: DEFAULT_POOL_SIZE, timeout: DEFAULT_TIMEOUT) do if redis_url&.empty? ::Redis.new else ::Redis.new(url: redis_url) end end @mutex = Mutex.new end |
Instance Method Details
#pub(channel, args = {}, &block) ⇒ Object
Example: pub(“ok”, message: “hello”)
27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/rubee/pubsub/redis.rb', line 27 def pub(channel, args = {}, &block) keys = with_redis { |r| r.scan_each(match: "#{channel}:*").to_a } return false if keys.empty? values = with_redis { |r| r.mget(*keys) } iterable = values.each_with_index.each_with_object({}) do |(val, i), hash| key = keys[i] hash[key] = val end clazzes = retrieve_klasses(iterable) fan_out(clazzes, args, &block) end |
#sub(channel, klass_name, *args, &block) ⇒ Object
Example: sub(“ok”, “User”, [“123”])
43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/rubee/pubsub/redis.rb', line 43 def sub(channel, klass_name, *args, &block) @mutex.synchronize do id = args.first id_string = id ? ":#{id}" : "" key = "#{channel}:#{klass_name}#{id_string}" existing = with_redis { |r| r.get(key) } io = args.last.respond_to?(:call) ? args.pop : nil with_redis { |r| r.set(key, args.join(",")) } unless existing block&.call(key, io: io) end true end |
#unsub(channel, klass_name, *args, &block) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/rubee/pubsub/redis.rb', line 57 def unsub(channel, klass_name, *args, &block) @mutex.synchronize do id = args.first id_string = id ? ":#{id}" : "" key = "#{channel}:#{klass_name}#{id_string}" value = with_redis { |r| r.get(key) } return false unless value io = args.pop if args.last.respond_to?(:call) with_redis { |r| r.del(key) } block&.call(key, io: io) end true end |