Class: Rage::PubSub::Adapters::Redis
- Inherits:
-
Object
- Object
- Rage::PubSub::Adapters::Redis
- Defined in:
- lib/rage/pubsub/adapters/redis.rb
Constant Summary collapse
- REDIS_STREAM_NAME =
"rage:pubsub:messages"- DEFAULT_REDIS_OPTIONS =
{ reconnect_attempts: [0.05, 0.1, 0.5] }
- DEFAULT_POOL_SIZE =
10- DEFAULT_POOL_TIMEOUT =
1- REDIS_MIN_VERSION_SUPPORTED =
Gem::Version.create(6)
Instance Method Summary collapse
- #add_broadcaster(broadcaster_id, broadcaster) ⇒ Object
-
#initialize(config) ⇒ Redis
constructor
A new instance of Redis.
- #publish(broadcaster_id, stream_name, data) ⇒ Object
Constructor Details
#initialize(config) ⇒ Redis
Returns a new instance of Redis.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/rage/pubsub/adapters/redis.rb', line 21 def initialize(config) @redis_stream = if (prefix = config.delete(:channel_prefix)) "#{prefix}:#{REDIS_STREAM_NAME}" else REDIS_STREAM_NAME end @pool_size = (config.delete(:pool_size) || DEFAULT_POOL_SIZE).to_i @pool_timeout = (config.delete(:pool_timeout) || DEFAULT_POOL_TIMEOUT).to_f @redis_config = RedisClient.config(**DEFAULT_REDIS_OPTIONS.merge(config)) @server_uuid = SecureRandom.uuid @broadcasters = {} redis_version = get_redis_version if redis_version.nil? return elsif redis_version < REDIS_MIN_VERSION_SUPPORTED raise "Redis adapter only supports Redis 6+. Detected Redis version: #{redis_version}." end @trimming_strategy = redis_version < Gem::Version.create("6.2.0") ? :maxlen : :minid Rage::Internal.pick_a_worker do puts("INFO: #{Process.pid} is managing Redis subscriptions.") if Rage.logger.info? poll end end |
Instance Method Details
#add_broadcaster(broadcaster_id, broadcaster) ⇒ Object
49 50 51 |
# File 'lib/rage/pubsub/adapters/redis.rb', line 49 def add_broadcaster(broadcaster_id, broadcaster) @broadcasters[broadcaster_id] = broadcaster end |
#publish(broadcaster_id, stream_name, data) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/rage/pubsub/adapters/redis.rb', line 53 def publish(broadcaster_id, stream_name, data) = SecureRandom.uuid redis_pool.with do |redis| redis.call( "XADD", @redis_stream, trimming_method, "~", trimming_value, "*", "1", stream_name, "2", data.to_json, "3", @server_uuid, "4", , "5", broadcaster_id ) end end |