Class: Faye::Redis::PubSubCoordinator

Inherits:
Object
  • Object
show all
Defined in:
lib/faye/redis/pubsub_coordinator.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, options = {}) ⇒ PubSubCoordinator

Returns a new instance of PubSubCoordinator.



9
10
11
12
13
14
15
16
17
18
19
# File 'lib/faye/redis/pubsub_coordinator.rb', line 9

def initialize(connection, options = {})
  @connection = connection
  @options = options
  @subscribers = []
  @redis_subscriber = nil
  @subscribed_channels = Set.new
  @subscriber_thread = nil
  @stop_subscriber = false
  @reconnect_attempts = 0
  # Don't setup subscriber immediately - wait until first publish/subscribe
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



7
8
9
# File 'lib/faye/redis/pubsub_coordinator.rb', line 7

def connection
  @connection
end

#optionsObject (readonly)

Returns the value of attribute options.



7
8
9
# File 'lib/faye/redis/pubsub_coordinator.rb', line 7

def options
  @options
end

Instance Method Details

#disconnectObject

Disconnect the pub/sub connection



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/faye/redis/pubsub_coordinator.rb', line 70

def disconnect
  @stop_subscriber = true

  if @subscriber_thread
    @subscriber_thread.kill
    @subscriber_thread = nil
  end

  if @redis_subscriber
    begin
      @redis_subscriber.quit
    rescue => e
      # Ignore errors during disconnect
    end
    @redis_subscriber = nil
  end
  @subscribed_channels.clear
  @subscribers.clear
end

#on_message(&block) ⇒ Object

Subscribe to messages from other servers



39
40
41
# File 'lib/faye/redis/pubsub_coordinator.rb', line 39

def on_message(&block)
  @subscribers << block
end

#publish(channel, message, &callback) ⇒ Object

Publish a message to a channel



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/faye/redis/pubsub_coordinator.rb', line 22

def publish(channel, message, &callback)
  # Ensure subscriber is setup
  setup_subscriber unless @subscriber_thread

  message_json = message.to_json

  @connection.with_redis do |redis|
    redis.publish(pubsub_channel(channel), message_json)
  end

  EventMachine.next_tick { callback.call(true) } if callback
rescue => e
  log_error("Failed to publish message to #{channel}: #{e.message}")
  EventMachine.next_tick { callback.call(false) } if callback
end

#subscribe_to_channel(channel) ⇒ Object

Subscribe to a Redis pub/sub channel



44
45
46
47
48
49
50
51
52
53
54
# File 'lib/faye/redis/pubsub_coordinator.rb', line 44

def subscribe_to_channel(channel)
  return if @subscribed_channels.include?(channel)

  @subscribed_channels.add(channel)

  if @redis_subscriber
    @redis_subscriber.subscribe(pubsub_channel(channel))
  end
rescue => e
  log_error("Failed to subscribe to channel #{channel}: #{e.message}")
end

#unsubscribe_from_channel(channel) ⇒ Object

Unsubscribe from a Redis pub/sub channel



57
58
59
60
61
62
63
64
65
66
67
# File 'lib/faye/redis/pubsub_coordinator.rb', line 57

def unsubscribe_from_channel(channel)
  return unless @subscribed_channels.include?(channel)

  @subscribed_channels.delete(channel)

  if @redis_subscriber
    @redis_subscriber.unsubscribe(pubsub_channel(channel))
  end
rescue => e
  log_error("Failed to unsubscribe from channel #{channel}: #{e.message}")
end