Class: Faye::Redis
- Inherits:
-
Object
- Object
- Faye::Redis
- Defined in:
- lib/faye/redis.rb,
lib/faye/redis/logger.rb,
lib/faye/redis/version.rb,
lib/faye/redis/connection.rb,
lib/faye/redis/message_queue.rb,
lib/faye/redis/client_registry.rb,
lib/faye/redis/pubsub_coordinator.rb,
lib/faye/redis/subscription_manager.rb
Defined Under Namespace
Classes: ClientRegistry, Connection, Logger, MessageQueue, PubSubCoordinator, SubscriptionManager
Constant Summary collapse
- DEFAULT_OPTIONS =
Default configuration options
{ host: 'localhost', port: 6379, database: 0, password: nil, pool_size: 5, pool_timeout: 5, connect_timeout: 1, read_timeout: 1, write_timeout: 1, max_retries: 3, retry_delay: 1, client_timeout: 60, message_ttl: 3600, namespace: 'faye' }.freeze
- VERSION =
'1.0.4'
Instance Attribute Summary collapse
-
#client_registry ⇒ Object
readonly
Returns the value of attribute client_registry.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#message_queue ⇒ Object
readonly
Returns the value of attribute message_queue.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#pubsub_coordinator ⇒ Object
readonly
Returns the value of attribute pubsub_coordinator.
-
#server ⇒ Object
readonly
Returns the value of attribute server.
-
#subscription_manager ⇒ Object
readonly
Returns the value of attribute subscription_manager.
Class Method Summary collapse
-
.create(server, options) ⇒ Object
Factory method to create a new Redis engine instance.
Instance Method Summary collapse
-
#client_exists(client_id, &callback) ⇒ Object
Check if a client exists.
-
#create_client(&callback) ⇒ Object
Create a new client.
-
#destroy_client(client_id, &callback) ⇒ Object
Destroy a client.
-
#disconnect ⇒ Object
Disconnect the engine.
-
#empty_queue(client_id) ⇒ Object
Empty a client’s message queue.
-
#initialize(server, options = {}) ⇒ Redis
constructor
A new instance of Redis.
-
#ping(client_id) ⇒ Object
Ping a client to keep it alive.
-
#publish(message, channels, &callback) ⇒ Object
Publish a message to channels.
-
#subscribe(client_id, channel, &callback) ⇒ Object
Subscribe a client to a channel.
-
#unsubscribe(client_id, channel, &callback) ⇒ Object
Unsubscribe a client from a channel.
Constructor Details
#initialize(server, options = {}) ⇒ Redis
Returns a new instance of Redis.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/faye/redis.rb', line 38 def initialize(server, = {}) @server = server @options = DEFAULT_OPTIONS.merge() @logger = Logger.new('Faye::Redis', @options) # Initialize components @connection = Connection.new(@options) @client_registry = ClientRegistry.new(@connection, @options) @subscription_manager = SubscriptionManager.new(@connection, @options) @message_queue = MessageQueue.new(@connection, @options) @pubsub_coordinator = PubSubCoordinator.new(@connection, @options) # Set up message routing end |
Instance Attribute Details
#client_registry ⇒ Object (readonly)
Returns the value of attribute client_registry.
30 31 32 |
# File 'lib/faye/redis.rb', line 30 def client_registry @client_registry end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
30 31 32 |
# File 'lib/faye/redis.rb', line 30 def connection @connection end |
#message_queue ⇒ Object (readonly)
Returns the value of attribute message_queue.
30 31 32 |
# File 'lib/faye/redis.rb', line 30 def @message_queue end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
30 31 32 |
# File 'lib/faye/redis.rb', line 30 def @options end |
#pubsub_coordinator ⇒ Object (readonly)
Returns the value of attribute pubsub_coordinator.
30 31 32 |
# File 'lib/faye/redis.rb', line 30 def pubsub_coordinator @pubsub_coordinator end |
#server ⇒ Object (readonly)
Returns the value of attribute server.
30 31 32 |
# File 'lib/faye/redis.rb', line 30 def server @server end |
#subscription_manager ⇒ Object (readonly)
Returns the value of attribute subscription_manager.
30 31 32 |
# File 'lib/faye/redis.rb', line 30 def subscription_manager @subscription_manager end |
Class Method Details
.create(server, options) ⇒ Object
Factory method to create a new Redis engine instance
34 35 36 |
# File 'lib/faye/redis.rb', line 34 def self.create(server, ) new(server, ) end |
Instance Method Details
#client_exists(client_id, &callback) ⇒ Object
Check if a client exists
76 77 78 |
# File 'lib/faye/redis.rb', line 76 def client_exists(client_id, &callback) @client_registry.exists?(client_id, &callback) end |
#create_client(&callback) ⇒ Object
Create a new client
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/faye/redis.rb', line 55 def create_client(&callback) client_id = generate_client_id @client_registry.create(client_id) do |success| if success callback.call(client_id) else callback.call(nil) end end end |
#destroy_client(client_id, &callback) ⇒ Object
Destroy a client
67 68 69 70 71 72 73 |
# File 'lib/faye/redis.rb', line 67 def destroy_client(client_id, &callback) @subscription_manager.unsubscribe_all(client_id) do @message_queue.clear(client_id) do @client_registry.destroy(client_id, &callback) end end end |
#disconnect ⇒ Object
Disconnect the engine
137 138 139 140 |
# File 'lib/faye/redis.rb', line 137 def disconnect @pubsub_coordinator.disconnect @connection.disconnect end |
#empty_queue(client_id) ⇒ Object
Empty a client’s message queue
132 133 134 |
# File 'lib/faye/redis.rb', line 132 def empty_queue(client_id) @message_queue.dequeue_all(client_id) end |
#ping(client_id) ⇒ Object
Ping a client to keep it alive
81 82 83 |
# File 'lib/faye/redis.rb', line 81 def ping(client_id) @client_registry.ping(client_id) end |
#publish(message, channels, &callback) ⇒ Object
Publish a message to channels
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/faye/redis.rb', line 96 def publish(, channels, &callback) channels = [channels] unless channels.is_a?(Array) begin remaining_operations = channels.size success = true channels.each do |channel| # Get subscribers and process in parallel @subscription_manager.get_subscribers(channel) do |client_ids| # Immediately publish to pub/sub (don't wait for enqueue) @pubsub_coordinator.publish(channel, ) do |published| success &&= published end # Enqueue for all subscribed clients in parallel (batch operation) if client_ids.any? (client_ids, ) do |enqueued| success &&= enqueued end end # Track completion remaining_operations -= 1 if remaining_operations == 0 && callback EventMachine.next_tick { callback.call(success) } end end end rescue => e log_error("Failed to publish message to channels #{channels}: #{e.}") EventMachine.next_tick { callback.call(false) } if callback end end |
#subscribe(client_id, channel, &callback) ⇒ Object
Subscribe a client to a channel
86 87 88 |
# File 'lib/faye/redis.rb', line 86 def subscribe(client_id, channel, &callback) @subscription_manager.subscribe(client_id, channel, &callback) end |
#unsubscribe(client_id, channel, &callback) ⇒ Object
Unsubscribe a client from a channel
91 92 93 |
# File 'lib/faye/redis.rb', line 91 def unsubscribe(client_id, channel, &callback) @subscription_manager.unsubscribe(client_id, channel, &callback) end |