Class: Faye::Redis
- Inherits:
-
Object
- Object
- Faye::Redis
- Defined in:
- lib/faye/redis.rb,
lib/faye/redis/logger.rb,
lib/faye/redis/version.rb,
lib/faye/redis/connection.rb,
lib/faye/redis/message_queue.rb,
lib/faye/redis/client_registry.rb,
lib/faye/redis/pubsub_coordinator.rb,
lib/faye/redis/subscription_manager.rb
Defined Under Namespace
Classes: ClientRegistry, Connection, Logger, MessageQueue, PubSubCoordinator, SubscriptionManager
Constant Summary collapse
- DEFAULT_OPTIONS =
Default configuration options
{ host: 'localhost', port: 6379, database: 0, password: nil, pool_size: 5, pool_timeout: 5, connect_timeout: 1, read_timeout: 1, write_timeout: 1, max_retries: 3, retry_delay: 1, client_timeout: 60, message_ttl: 3600, namespace: 'faye' }.freeze
- VERSION =
'1.0.0'
Instance Attribute Summary collapse
-
#client_registry ⇒ Object
readonly
Returns the value of attribute client_registry.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#message_queue ⇒ Object
readonly
Returns the value of attribute message_queue.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#pubsub_coordinator ⇒ Object
readonly
Returns the value of attribute pubsub_coordinator.
-
#server ⇒ Object
readonly
Returns the value of attribute server.
-
#subscription_manager ⇒ Object
readonly
Returns the value of attribute subscription_manager.
Class Method Summary collapse
-
.create(server, options) ⇒ Object
Factory method to create a new Redis engine instance.
Instance Method Summary collapse
-
#client_exists(client_id, &callback) ⇒ Object
Check if a client exists.
-
#create_client(&callback) ⇒ Object
Create a new client.
-
#destroy_client(client_id, &callback) ⇒ Object
Destroy a client.
-
#disconnect ⇒ Object
Disconnect the engine.
-
#empty_queue(client_id) ⇒ Object
Empty a client’s message queue.
-
#initialize(server, options = {}) ⇒ Redis
constructor
A new instance of Redis.
-
#ping(client_id) ⇒ Object
Ping a client to keep it alive.
-
#publish(message, channels, &callback) ⇒ Object
Publish a message to channels.
-
#subscribe(client_id, channel, &callback) ⇒ Object
Subscribe a client to a channel.
-
#unsubscribe(client_id, channel, &callback) ⇒ Object
Unsubscribe a client from a channel.
Constructor Details
#initialize(server, options = {}) ⇒ Redis
Returns a new instance of Redis.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/faye/redis.rb', line 38 def initialize(server, = {}) @server = server @options = DEFAULT_OPTIONS.merge() @logger = Logger.new('Faye::Redis', @options) # Initialize components @connection = Connection.new(@options) @client_registry = ClientRegistry.new(@connection, @options) @subscription_manager = SubscriptionManager.new(@connection, @options) @message_queue = MessageQueue.new(@connection, @options) @pubsub_coordinator = PubSubCoordinator.new(@connection, @options) # Set up message routing end |
Instance Attribute Details
#client_registry ⇒ Object (readonly)
Returns the value of attribute client_registry.
30 31 32 |
# File 'lib/faye/redis.rb', line 30 def client_registry @client_registry end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
30 31 32 |
# File 'lib/faye/redis.rb', line 30 def connection @connection end |
#message_queue ⇒ Object (readonly)
Returns the value of attribute message_queue.
30 31 32 |
# File 'lib/faye/redis.rb', line 30 def @message_queue end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
30 31 32 |
# File 'lib/faye/redis.rb', line 30 def @options end |
#pubsub_coordinator ⇒ Object (readonly)
Returns the value of attribute pubsub_coordinator.
30 31 32 |
# File 'lib/faye/redis.rb', line 30 def pubsub_coordinator @pubsub_coordinator end |
#server ⇒ Object (readonly)
Returns the value of attribute server.
30 31 32 |
# File 'lib/faye/redis.rb', line 30 def server @server end |
#subscription_manager ⇒ Object (readonly)
Returns the value of attribute subscription_manager.
30 31 32 |
# File 'lib/faye/redis.rb', line 30 def subscription_manager @subscription_manager end |
Class Method Details
.create(server, options) ⇒ Object
Factory method to create a new Redis engine instance
34 35 36 |
# File 'lib/faye/redis.rb', line 34 def self.create(server, ) new(server, ) end |
Instance Method Details
#client_exists(client_id, &callback) ⇒ Object
Check if a client exists
74 75 76 |
# File 'lib/faye/redis.rb', line 74 def client_exists(client_id, &callback) @client_registry.exists?(client_id, &callback) end |
#create_client(&callback) ⇒ Object
Create a new client
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/faye/redis.rb', line 55 def create_client(&callback) client_id = generate_client_id @client_registry.create(client_id) do |success| if success callback.call(client_id) else callback.call(nil) end end end |
#destroy_client(client_id, &callback) ⇒ Object
Destroy a client
67 68 69 70 71 |
# File 'lib/faye/redis.rb', line 67 def destroy_client(client_id, &callback) @subscription_manager.unsubscribe_all(client_id) do @client_registry.destroy(client_id, &callback) end end |
#disconnect ⇒ Object
Disconnect the engine
128 129 130 131 |
# File 'lib/faye/redis.rb', line 128 def disconnect @pubsub_coordinator.disconnect @connection.disconnect end |
#empty_queue(client_id) ⇒ Object
Empty a client’s message queue
123 124 125 |
# File 'lib/faye/redis.rb', line 123 def empty_queue(client_id) @message_queue.dequeue_all(client_id) end |
#ping(client_id) ⇒ Object
Ping a client to keep it alive
79 80 81 |
# File 'lib/faye/redis.rb', line 79 def ping(client_id) @client_registry.ping(client_id) end |
#publish(message, channels, &callback) ⇒ Object
Publish a message to channels
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/faye/redis.rb', line 94 def publish(, channels, &callback) channels = [channels] unless channels.is_a?(Array) success = true begin channels.each do |channel| # Store message in queues for subscribed clients @subscription_manager.get_subscribers(channel) do |client_ids| client_ids.each do |client_id| @message_queue.enqueue(client_id, ) do |enqueued| success &&= enqueued end end end # Publish to Redis pub/sub for cross-server routing @pubsub_coordinator.publish(channel, ) do |published| success &&= published end end EventMachine.next_tick { callback.call(success) } if callback rescue => e log_error("Failed to publish message to channels #{channels}: #{e.}") EventMachine.next_tick { callback.call(false) } if callback end end |
#subscribe(client_id, channel, &callback) ⇒ Object
Subscribe a client to a channel
84 85 86 |
# File 'lib/faye/redis.rb', line 84 def subscribe(client_id, channel, &callback) @subscription_manager.subscribe(client_id, channel, &callback) end |
#unsubscribe(client_id, channel, &callback) ⇒ Object
Unsubscribe a client from a channel
89 90 91 |
# File 'lib/faye/redis.rb', line 89 def unsubscribe(client_id, channel, &callback) @subscription_manager.unsubscribe(client_id, channel, &callback) end |