Class: Faye::Redis
- Inherits:
-
Object
- Object
- Faye::Redis
- Defined in:
- lib/faye/redis.rb,
lib/faye/redis/logger.rb,
lib/faye/redis/version.rb,
lib/faye/redis/connection.rb,
lib/faye/redis/message_queue.rb,
lib/faye/redis/client_registry.rb,
lib/faye/redis/pubsub_coordinator.rb,
lib/faye/redis/subscription_manager.rb
Defined Under Namespace
Classes: ClientRegistry, Connection, Logger, MessageQueue, PubSubCoordinator, SubscriptionManager
Constant Summary collapse
- DEFAULT_OPTIONS =
Default configuration options
{ host: 'localhost', port: 6379, database: 0, password: nil, pool_size: 5, pool_timeout: 5, connect_timeout: 1, read_timeout: 1, write_timeout: 1, max_retries: 3, retry_delay: 1, client_timeout: 60, message_ttl: 3600, namespace: 'faye', gc_interval: 60 # Automatic garbage collection interval (seconds), set to 0 or false to disable }.freeze
- VERSION =
'1.0.6'
Instance Attribute Summary collapse
-
#client_registry ⇒ Object
readonly
Returns the value of attribute client_registry.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#message_queue ⇒ Object
readonly
Returns the value of attribute message_queue.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#pubsub_coordinator ⇒ Object
readonly
Returns the value of attribute pubsub_coordinator.
-
#server ⇒ Object
readonly
Returns the value of attribute server.
-
#subscription_manager ⇒ Object
readonly
Returns the value of attribute subscription_manager.
Class Method Summary collapse
-
.create(server, options) ⇒ Object
Factory method to create a new Redis engine instance.
Instance Method Summary collapse
-
#cleanup_expired(&callback) ⇒ Object
Clean up expired clients and their associated data.
-
#client_exists(client_id, &callback) ⇒ Object
Check if a client exists.
-
#create_client(&callback) ⇒ Object
Create a new client.
-
#destroy_client(client_id, &callback) ⇒ Object
Destroy a client.
-
#disconnect ⇒ Object
Disconnect the engine.
-
#empty_queue(client_id) ⇒ Object
Empty a client’s message queue.
-
#initialize(server, options = {}) ⇒ Redis
constructor
A new instance of Redis.
-
#ping(client_id) ⇒ Object
Ping a client to keep it alive.
-
#publish(message, channels, &callback) ⇒ Object
Publish a message to channels.
-
#subscribe(client_id, channel, &callback) ⇒ Object
Subscribe a client to a channel.
-
#unsubscribe(client_id, channel, &callback) ⇒ Object
Unsubscribe a client from a channel.
Constructor Details
#initialize(server, options = {}) ⇒ Redis
Returns a new instance of Redis.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/faye/redis.rb', line 40 def initialize(server, = {}) @server = server @options = DEFAULT_OPTIONS.merge() @logger = Logger.new('Faye::Redis', @options) # Initialize components @connection = Connection.new(@options) @client_registry = ClientRegistry.new(@connection, @options) @subscription_manager = SubscriptionManager.new(@connection, @options) @message_queue = MessageQueue.new(@connection, @options) @pubsub_coordinator = PubSubCoordinator.new(@connection, @options) # Set up message routing # Start automatic garbage collection timer start_gc_timer end |
Instance Attribute Details
#client_registry ⇒ Object (readonly)
Returns the value of attribute client_registry.
32 33 34 |
# File 'lib/faye/redis.rb', line 32 def client_registry @client_registry end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
32 33 34 |
# File 'lib/faye/redis.rb', line 32 def connection @connection end |
#message_queue ⇒ Object (readonly)
Returns the value of attribute message_queue.
32 33 34 |
# File 'lib/faye/redis.rb', line 32 def @message_queue end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
32 33 34 |
# File 'lib/faye/redis.rb', line 32 def @options end |
#pubsub_coordinator ⇒ Object (readonly)
Returns the value of attribute pubsub_coordinator.
32 33 34 |
# File 'lib/faye/redis.rb', line 32 def pubsub_coordinator @pubsub_coordinator end |
#server ⇒ Object (readonly)
Returns the value of attribute server.
32 33 34 |
# File 'lib/faye/redis.rb', line 32 def server @server end |
#subscription_manager ⇒ Object (readonly)
Returns the value of attribute subscription_manager.
32 33 34 |
# File 'lib/faye/redis.rb', line 32 def subscription_manager @subscription_manager end |
Class Method Details
.create(server, options) ⇒ Object
Factory method to create a new Redis engine instance
36 37 38 |
# File 'lib/faye/redis.rb', line 36 def self.create(server, ) new(server, ) end |
Instance Method Details
#cleanup_expired(&callback) ⇒ Object
Clean up expired clients and their associated data
154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/faye/redis.rb', line 154 def cleanup_expired(&callback) @client_registry.cleanup_expired do |expired_count| @logger.info("Cleaned up #{expired_count} expired clients") if expired_count > 0 # Always clean up orphaned subscription keys (even if no expired clients) # This handles cases where subscriptions were orphaned due to crashes cleanup_orphaned_subscriptions do callback.call(expired_count) if callback end end end |
#client_exists(client_id, &callback) ⇒ Object
Check if a client exists
84 85 86 |
# File 'lib/faye/redis.rb', line 84 def client_exists(client_id, &callback) @client_registry.exists?(client_id, &callback) end |
#create_client(&callback) ⇒ Object
Create a new client
60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/faye/redis.rb', line 60 def create_client(&callback) # Ensure GC timer is started (lazy initialization) ensure_gc_timer_started client_id = generate_client_id @client_registry.create(client_id) do |success| if success callback.call(client_id) else callback.call(nil) end end end |
#destroy_client(client_id, &callback) ⇒ Object
Destroy a client
75 76 77 78 79 80 81 |
# File 'lib/faye/redis.rb', line 75 def destroy_client(client_id, &callback) @subscription_manager.unsubscribe_all(client_id) do @message_queue.clear(client_id) do @client_registry.destroy(client_id, &callback) end end end |
#disconnect ⇒ Object
Disconnect the engine
145 146 147 148 149 150 151 |
# File 'lib/faye/redis.rb', line 145 def disconnect # Stop GC timer if running stop_gc_timer @pubsub_coordinator.disconnect @connection.disconnect end |
#empty_queue(client_id) ⇒ Object
Empty a client’s message queue
140 141 142 |
# File 'lib/faye/redis.rb', line 140 def empty_queue(client_id) @message_queue.dequeue_all(client_id) end |
#ping(client_id) ⇒ Object
Ping a client to keep it alive
89 90 91 |
# File 'lib/faye/redis.rb', line 89 def ping(client_id) @client_registry.ping(client_id) end |
#publish(message, channels, &callback) ⇒ Object
Publish a message to channels
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/faye/redis.rb', line 104 def publish(, channels, &callback) channels = [channels] unless channels.is_a?(Array) begin remaining_operations = channels.size success = true channels.each do |channel| # Get subscribers and process in parallel @subscription_manager.get_subscribers(channel) do |client_ids| # Immediately publish to pub/sub (don't wait for enqueue) @pubsub_coordinator.publish(channel, ) do |published| success &&= published end # Enqueue for all subscribed clients in parallel (batch operation) if client_ids.any? (client_ids, ) do |enqueued| success &&= enqueued end end # Track completion remaining_operations -= 1 if remaining_operations == 0 && callback EventMachine.next_tick { callback.call(success) } end end end rescue => e log_error("Failed to publish message to channels #{channels}: #{e.}") EventMachine.next_tick { callback.call(false) } if callback end end |
#subscribe(client_id, channel, &callback) ⇒ Object
Subscribe a client to a channel
94 95 96 |
# File 'lib/faye/redis.rb', line 94 def subscribe(client_id, channel, &callback) @subscription_manager.subscribe(client_id, channel, &callback) end |
#unsubscribe(client_id, channel, &callback) ⇒ Object
Unsubscribe a client from a channel
99 100 101 |
# File 'lib/faye/redis.rb', line 99 def unsubscribe(client_id, channel, &callback) @subscription_manager.unsubscribe(client_id, channel, &callback) end |