Class: Rage::PubSub::Adapters::Redis

Inherits:
Object
  • Object
show all
Defined in:
lib/rage/pubsub/adapters/redis.rb

Constant Summary collapse

REDIS_STREAM_NAME =
"rage:pubsub:messages"
DEFAULT_REDIS_OPTIONS =
{ reconnect_attempts: [0.05, 0.1, 0.5] }
DEFAULT_POOL_SIZE =
10
DEFAULT_POOL_TIMEOUT =
1
REDIS_MIN_VERSION_SUPPORTED =
Gem::Version.create(6)

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Redis

Returns a new instance of Redis.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/rage/pubsub/adapters/redis.rb', line 21

def initialize(config)
  @redis_stream = if (prefix = config.delete(:channel_prefix))
    "#{prefix}:#{REDIS_STREAM_NAME}"
  else
    REDIS_STREAM_NAME
  end

  @pool_size = (config.delete(:pool_size) || DEFAULT_POOL_SIZE).to_i
  @pool_timeout = (config.delete(:pool_timeout) || DEFAULT_POOL_TIMEOUT).to_f
  @redis_config = RedisClient.config(**DEFAULT_REDIS_OPTIONS.merge(config))
  @server_uuid = SecureRandom.uuid
  @broadcasters = {}

  redis_version = get_redis_version
  if redis_version.nil?
    return
  elsif redis_version < REDIS_MIN_VERSION_SUPPORTED
    raise "Redis adapter only supports Redis 6+. Detected Redis version: #{redis_version}."
  end

  @trimming_strategy = redis_version < Gem::Version.create("6.2.0") ? :maxlen : :minid

  Rage::Internal.pick_a_worker do
    puts("INFO: #{Process.pid} is managing Redis subscriptions.") if Rage.logger.info?
    poll
  end
end

Instance Method Details

#add_broadcaster(broadcaster_id, broadcaster) ⇒ Object



49
50
51
# File 'lib/rage/pubsub/adapters/redis.rb', line 49

def add_broadcaster(broadcaster_id, broadcaster)
  @broadcasters[broadcaster_id] = broadcaster
end

#publish(broadcaster_id, stream_name, data) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/rage/pubsub/adapters/redis.rb', line 53

def publish(broadcaster_id, stream_name, data)
  message_uuid = SecureRandom.uuid

  redis_pool.with do |redis|
    redis.call(
      "XADD",
      @redis_stream,
      trimming_method, "~", trimming_value,
      "*",
      "1", stream_name,
      "2", data.to_json,
      "3", @server_uuid,
      "4", message_uuid,
      "5", broadcaster_id
    )
  end
end