Class: Faye::Redis
- Inherits:
-
Object
- Object
- Faye::Redis
- Defined in:
- lib/faye/redis.rb,
lib/faye/redis/logger.rb,
lib/faye/redis/version.rb,
lib/faye/redis/connection.rb,
lib/faye/redis/message_queue.rb,
lib/faye/redis/client_registry.rb,
lib/faye/redis/pubsub_coordinator.rb,
lib/faye/redis/subscription_manager.rb
Defined Under Namespace
Classes: ClientRegistry, Connection, Logger, MessageQueue, PubSubCoordinator, SubscriptionManager
Constant Summary collapse
- DEFAULT_OPTIONS =
Default configuration options
{ host: 'localhost', port: 6379, database: 0, password: nil, pool_size: 5, pool_timeout: 5, connect_timeout: 1, read_timeout: 1, write_timeout: 1, max_retries: 3, retry_delay: 1, client_timeout: 60, message_ttl: 3600, namespace: 'faye', gc_interval: 60 # Automatic garbage collection interval (seconds), set to 0 or false to disable }.freeze
- VERSION =
'1.0.7'
Instance Attribute Summary collapse
-
#client_registry ⇒ Object
readonly
Returns the value of attribute client_registry.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#message_queue ⇒ Object
readonly
Returns the value of attribute message_queue.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#pubsub_coordinator ⇒ Object
readonly
Returns the value of attribute pubsub_coordinator.
-
#server ⇒ Object
readonly
Returns the value of attribute server.
-
#subscription_manager ⇒ Object
readonly
Returns the value of attribute subscription_manager.
Class Method Summary collapse
-
.create(server, options) ⇒ Object
Factory method to create a new Redis engine instance.
Instance Method Summary collapse
-
#cleanup_expired(&callback) ⇒ Object
Clean up expired clients and their associated data.
-
#client_exists(client_id, &callback) ⇒ Object
Check if a client exists.
-
#create_client(&callback) ⇒ Object
Create a new client.
-
#destroy_client(client_id, &callback) ⇒ Object
Destroy a client.
-
#disconnect ⇒ Object
Disconnect the engine.
-
#empty_queue(client_id) ⇒ Object
Empty a client’s message queue.
-
#initialize(server, options = {}) ⇒ Redis
constructor
A new instance of Redis.
-
#ping(client_id) ⇒ Object
Ping a client to keep it alive.
-
#publish(message, channels, &callback) ⇒ Object
Publish a message to channels.
-
#subscribe(client_id, channel, &callback) ⇒ Object
Subscribe a client to a channel.
-
#unsubscribe(client_id, channel, &callback) ⇒ Object
Unsubscribe a client from a channel.
Constructor Details
#initialize(server, options = {}) ⇒ Redis
Returns a new instance of Redis.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/faye/redis.rb', line 40 def initialize(server, = {}) @server = server @options = DEFAULT_OPTIONS.merge() @logger = Logger.new('Faye::Redis', @options) # Initialize components @connection = Connection.new(@options) @client_registry = ClientRegistry.new(@connection, @options) @subscription_manager = SubscriptionManager.new(@connection, @options) @message_queue = MessageQueue.new(@connection, @options) @pubsub_coordinator = PubSubCoordinator.new(@connection, @options) # Set up message routing # Start automatic garbage collection timer start_gc_timer end |
Instance Attribute Details
#client_registry ⇒ Object (readonly)
Returns the value of attribute client_registry.
32 33 34 |
# File 'lib/faye/redis.rb', line 32 def client_registry @client_registry end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
32 33 34 |
# File 'lib/faye/redis.rb', line 32 def connection @connection end |
#message_queue ⇒ Object (readonly)
Returns the value of attribute message_queue.
32 33 34 |
# File 'lib/faye/redis.rb', line 32 def @message_queue end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
32 33 34 |
# File 'lib/faye/redis.rb', line 32 def @options end |
#pubsub_coordinator ⇒ Object (readonly)
Returns the value of attribute pubsub_coordinator.
32 33 34 |
# File 'lib/faye/redis.rb', line 32 def pubsub_coordinator @pubsub_coordinator end |
#server ⇒ Object (readonly)
Returns the value of attribute server.
32 33 34 |
# File 'lib/faye/redis.rb', line 32 def server @server end |
#subscription_manager ⇒ Object (readonly)
Returns the value of attribute subscription_manager.
32 33 34 |
# File 'lib/faye/redis.rb', line 32 def subscription_manager @subscription_manager end |
Class Method Details
.create(server, options) ⇒ Object
Factory method to create a new Redis engine instance
36 37 38 |
# File 'lib/faye/redis.rb', line 36 def self.create(server, ) new(server, ) end |
Instance Method Details
#cleanup_expired(&callback) ⇒ Object
Clean up expired clients and their associated data
188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/faye/redis.rb', line 188 def cleanup_expired(&callback) @client_registry.cleanup_expired do |expired_count| @logger.info("Cleaned up #{expired_count} expired clients") if expired_count > 0 # Always clean up orphaned subscription keys (even if no expired clients) # This handles cases where subscriptions were orphaned due to crashes cleanup_orphaned_subscriptions do callback.call(expired_count) if callback end end end |
#client_exists(client_id, &callback) ⇒ Object
Check if a client exists
84 85 86 |
# File 'lib/faye/redis.rb', line 84 def client_exists(client_id, &callback) @client_registry.exists?(client_id, &callback) end |
#create_client(&callback) ⇒ Object
Create a new client
60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/faye/redis.rb', line 60 def create_client(&callback) # Ensure GC timer is started (lazy initialization) ensure_gc_timer_started client_id = generate_client_id @client_registry.create(client_id) do |success| if success callback.call(client_id) else callback.call(nil) end end end |
#destroy_client(client_id, &callback) ⇒ Object
Destroy a client
75 76 77 78 79 80 81 |
# File 'lib/faye/redis.rb', line 75 def destroy_client(client_id, &callback) @subscription_manager.unsubscribe_all(client_id) do @message_queue.clear(client_id) do @client_registry.destroy(client_id, &callback) end end end |
#disconnect ⇒ Object
Disconnect the engine
179 180 181 182 183 184 185 |
# File 'lib/faye/redis.rb', line 179 def disconnect # Stop GC timer if running stop_gc_timer @pubsub_coordinator.disconnect @connection.disconnect end |
#empty_queue(client_id) ⇒ Object
Empty a client’s message queue
174 175 176 |
# File 'lib/faye/redis.rb', line 174 def empty_queue(client_id) @message_queue.dequeue_all(client_id) end |
#ping(client_id) ⇒ Object
Ping a client to keep it alive
89 90 91 |
# File 'lib/faye/redis.rb', line 89 def ping(client_id) @client_registry.ping(client_id) end |
#publish(message, channels, &callback) ⇒ Object
Publish a message to channels
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/faye/redis.rb', line 104 def publish(, channels, &callback) channels = [channels] unless channels.is_a?(Array) begin # Ensure message has an ID for deduplication = .dup unless .frozen? ['id'] ||= # Track this message as locally published if @local_message_ids if @local_message_ids_mutex @local_message_ids_mutex.synchronize { @local_message_ids.add(['id']) } else @local_message_ids.add(['id']) end end total_channels = channels.size completed_channels = 0 callback_called = false all_success = true channels.each do |channel| # Get subscribers and process in parallel @subscription_manager.get_subscribers(channel) do |client_ids| # Track operations for this channel pending_ops = 2 # pubsub + enqueue channel_success = true ops_completed = 0 complete_channel = lambda do ops_completed += 1 if ops_completed == pending_ops # This channel is complete all_success &&= channel_success completed_channels += 1 # Call final callback when all channels are done if completed_channels == total_channels && !callback_called && callback callback_called = true EventMachine.next_tick { callback.call(all_success) } end end end # Publish to pub/sub @pubsub_coordinator.publish(channel, ) do |published| channel_success &&= published complete_channel.call end # Enqueue for all subscribed clients if client_ids.any? (client_ids, ) do |enqueued| channel_success &&= enqueued complete_channel.call end else # No clients, but still need to complete complete_channel.call end end end rescue => e log_error("Failed to publish message to channels #{channels}: #{e.}") EventMachine.next_tick { callback.call(false) } if callback && !callback_called end end |
#subscribe(client_id, channel, &callback) ⇒ Object
Subscribe a client to a channel
94 95 96 |
# File 'lib/faye/redis.rb', line 94 def subscribe(client_id, channel, &callback) @subscription_manager.subscribe(client_id, channel, &callback) end |
#unsubscribe(client_id, channel, &callback) ⇒ Object
Unsubscribe a client from a channel
99 100 101 |
# File 'lib/faye/redis.rb', line 99 def unsubscribe(client_id, channel, &callback) @subscription_manager.unsubscribe(client_id, channel, &callback) end |