Class: Faye::Redis::PubSubCoordinator
- Inherits:
-
Object
- Object
- Faye::Redis::PubSubCoordinator
- Defined in:
- lib/faye/redis/pubsub_coordinator.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
Instance Method Summary collapse
-
#disconnect ⇒ Object
Disconnect the pub/sub connection.
-
#initialize(connection, options = {}) ⇒ PubSubCoordinator
constructor
A new instance of PubSubCoordinator.
-
#on_message(&block) ⇒ Object
Subscribe to messages from other servers.
-
#publish(channel, message, &callback) ⇒ Object
Publish a message to a channel.
-
#subscribe_to_channel(channel) ⇒ Object
Subscribe to a Redis pub/sub channel.
-
#unsubscribe_from_channel(channel) ⇒ Object
Unsubscribe from a Redis pub/sub channel.
Constructor Details
#initialize(connection, options = {}) ⇒ PubSubCoordinator
Returns a new instance of PubSubCoordinator.
9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/faye/redis/pubsub_coordinator.rb', line 9 def initialize(connection, = {}) @connection = connection @options = @subscribers = [] @redis_subscriber = nil @subscribed_channels = Set.new @subscriber_thread = nil @stop_subscriber = false @reconnect_attempts = 0 # Don't setup subscriber immediately - wait until first publish/subscribe end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
7 8 9 |
# File 'lib/faye/redis/pubsub_coordinator.rb', line 7 def connection @connection end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
7 8 9 |
# File 'lib/faye/redis/pubsub_coordinator.rb', line 7 def @options end |
Instance Method Details
#disconnect ⇒ Object
Disconnect the pub/sub connection
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/faye/redis/pubsub_coordinator.rb', line 70 def disconnect @stop_subscriber = true if @subscriber_thread @subscriber_thread.kill @subscriber_thread = nil end if @redis_subscriber begin @redis_subscriber.quit rescue => e # Ignore errors during disconnect end @redis_subscriber = nil end @subscribed_channels.clear @subscribers.clear end |
#on_message(&block) ⇒ Object
Subscribe to messages from other servers
39 40 41 |
# File 'lib/faye/redis/pubsub_coordinator.rb', line 39 def (&block) @subscribers << block end |
#publish(channel, message, &callback) ⇒ Object
Publish a message to a channel
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/faye/redis/pubsub_coordinator.rb', line 22 def publish(channel, , &callback) # Ensure subscriber is setup setup_subscriber unless @subscriber_thread = .to_json @connection.with_redis do |redis| redis.publish(pubsub_channel(channel), ) end EventMachine.next_tick { callback.call(true) } if callback rescue => e log_error("Failed to publish message to #{channel}: #{e.}") EventMachine.next_tick { callback.call(false) } if callback end |
#subscribe_to_channel(channel) ⇒ Object
Subscribe to a Redis pub/sub channel
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/faye/redis/pubsub_coordinator.rb', line 44 def subscribe_to_channel(channel) return if @subscribed_channels.include?(channel) @subscribed_channels.add(channel) if @redis_subscriber @redis_subscriber.subscribe(pubsub_channel(channel)) end rescue => e log_error("Failed to subscribe to channel #{channel}: #{e.}") end |
#unsubscribe_from_channel(channel) ⇒ Object
Unsubscribe from a Redis pub/sub channel
57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/faye/redis/pubsub_coordinator.rb', line 57 def unsubscribe_from_channel(channel) return unless @subscribed_channels.include?(channel) @subscribed_channels.delete(channel) if @redis_subscriber @redis_subscriber.unsubscribe(pubsub_channel(channel)) end rescue => e log_error("Failed to unsubscribe from channel #{channel}: #{e.}") end |