Class: Faye::Redis

Inherits:
Object
  • Object
show all
Defined in:
lib/faye/redis.rb,
lib/faye/redis/logger.rb,
lib/faye/redis/version.rb,
lib/faye/redis/connection.rb,
lib/faye/redis/message_queue.rb,
lib/faye/redis/client_registry.rb,
lib/faye/redis/pubsub_coordinator.rb,
lib/faye/redis/subscription_manager.rb

Defined Under Namespace

Classes: ClientRegistry, Connection, Logger, MessageQueue, PubSubCoordinator, SubscriptionManager

Constant Summary collapse

DEFAULT_OPTIONS =

Default configuration options

{
  host: 'localhost',
  port: 6379,
  database: 0,
  password: nil,
  pool_size: 5,
  pool_timeout: 5,
  connect_timeout: 1,
  read_timeout: 1,
  write_timeout: 1,
  max_retries: 3,
  retry_delay: 1,
  client_timeout: 60,
  message_ttl: 3600,
  namespace: 'faye'
}.freeze
VERSION =
'1.0.3'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, options = {}) ⇒ Redis

Returns a new instance of Redis.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/faye/redis.rb', line 38

def initialize(server, options = {})
  @server = server
  @options = DEFAULT_OPTIONS.merge(options)
  @logger = Logger.new('Faye::Redis', @options)

  # Initialize components
  @connection = Connection.new(@options)
  @client_registry = ClientRegistry.new(@connection, @options)
  @subscription_manager = SubscriptionManager.new(@connection, @options)
  @message_queue = MessageQueue.new(@connection, @options)
  @pubsub_coordinator = PubSubCoordinator.new(@connection, @options)

  # Set up message routing
  setup_message_routing
end

Instance Attribute Details

#client_registryObject (readonly)

Returns the value of attribute client_registry.



30
31
32
# File 'lib/faye/redis.rb', line 30

def client_registry
  @client_registry
end

#connectionObject (readonly)

Returns the value of attribute connection.



30
31
32
# File 'lib/faye/redis.rb', line 30

def connection
  @connection
end

#message_queueObject (readonly)

Returns the value of attribute message_queue.



30
31
32
# File 'lib/faye/redis.rb', line 30

def message_queue
  @message_queue
end

#optionsObject (readonly)

Returns the value of attribute options.



30
31
32
# File 'lib/faye/redis.rb', line 30

def options
  @options
end

#pubsub_coordinatorObject (readonly)

Returns the value of attribute pubsub_coordinator.



30
31
32
# File 'lib/faye/redis.rb', line 30

def pubsub_coordinator
  @pubsub_coordinator
end

#serverObject (readonly)

Returns the value of attribute server.



30
31
32
# File 'lib/faye/redis.rb', line 30

def server
  @server
end

#subscription_managerObject (readonly)

Returns the value of attribute subscription_manager.



30
31
32
# File 'lib/faye/redis.rb', line 30

def subscription_manager
  @subscription_manager
end

Class Method Details

.create(server, options) ⇒ Object

Factory method to create a new Redis engine instance



34
35
36
# File 'lib/faye/redis.rb', line 34

def self.create(server, options)
  new(server, options)
end

Instance Method Details

#client_exists(client_id, &callback) ⇒ Object

Check if a client exists



76
77
78
# File 'lib/faye/redis.rb', line 76

def client_exists(client_id, &callback)
  @client_registry.exists?(client_id, &callback)
end

#create_client(&callback) ⇒ Object

Create a new client



55
56
57
58
59
60
61
62
63
64
# File 'lib/faye/redis.rb', line 55

def create_client(&callback)
  client_id = generate_client_id
  @client_registry.create(client_id) do |success|
    if success
      callback.call(client_id)
    else
      callback.call(nil)
    end
  end
end

#destroy_client(client_id, &callback) ⇒ Object

Destroy a client



67
68
69
70
71
72
73
# File 'lib/faye/redis.rb', line 67

def destroy_client(client_id, &callback)
  @subscription_manager.unsubscribe_all(client_id) do
    @message_queue.clear(client_id) do
      @client_registry.destroy(client_id, &callback)
    end
  end
end

#disconnectObject

Disconnect the engine



153
154
155
156
# File 'lib/faye/redis.rb', line 153

def disconnect
  @pubsub_coordinator.disconnect
  @connection.disconnect
end

#empty_queue(client_id) ⇒ Object

Empty a client’s message queue



148
149
150
# File 'lib/faye/redis.rb', line 148

def empty_queue(client_id)
  @message_queue.dequeue_all(client_id)
end

#ping(client_id) ⇒ Object

Ping a client to keep it alive



81
82
83
# File 'lib/faye/redis.rb', line 81

def ping(client_id)
  @client_registry.ping(client_id)
end

#publish(message, channels, &callback) ⇒ Object

Publish a message to channels



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/faye/redis.rb', line 96

def publish(message, channels, &callback)
  channels = [channels] unless channels.is_a?(Array)

  begin
    remaining_operations = channels.size
    success = true

    channels.each do |channel|
      # Store message in queues for subscribed clients
      @subscription_manager.get_subscribers(channel) do |client_ids|
        enqueue_count = client_ids.size

        if enqueue_count == 0
          # No clients to enqueue, just do pub/sub
          @pubsub_coordinator.publish(channel, message) do |published|
            success &&= published
            remaining_operations -= 1

            if remaining_operations == 0 && callback
              EventMachine.next_tick { callback.call(success) }
            end
          end
        else
          # Enqueue for all subscribed clients
          client_ids.each do |client_id|
            @message_queue.enqueue(client_id, message) do |enqueued|
              success &&= enqueued
              enqueue_count -= 1

              # When all enqueues are done, do pub/sub
              if enqueue_count == 0
                @pubsub_coordinator.publish(channel, message) do |published|
                  success &&= published
                  remaining_operations -= 1

                  if remaining_operations == 0 && callback
                    EventMachine.next_tick { callback.call(success) }
                  end
                end
              end
            end
          end
        end
      end
    end
  rescue => e
    log_error("Failed to publish message to channels #{channels}: #{e.message}")
    EventMachine.next_tick { callback.call(false) } if callback
  end
end

#subscribe(client_id, channel, &callback) ⇒ Object

Subscribe a client to a channel



86
87
88
# File 'lib/faye/redis.rb', line 86

def subscribe(client_id, channel, &callback)
  @subscription_manager.subscribe(client_id, channel, &callback)
end

#unsubscribe(client_id, channel, &callback) ⇒ Object

Unsubscribe a client from a channel



91
92
93
# File 'lib/faye/redis.rb', line 91

def unsubscribe(client_id, channel, &callback)
  @subscription_manager.unsubscribe(client_id, channel, &callback)
end