Class: Faye::Redis

Inherits:
Object
  • Object
show all
Defined in:
lib/faye/redis.rb,
lib/faye/redis/logger.rb,
lib/faye/redis/version.rb,
lib/faye/redis/connection.rb,
lib/faye/redis/message_queue.rb,
lib/faye/redis/client_registry.rb,
lib/faye/redis/pubsub_coordinator.rb,
lib/faye/redis/subscription_manager.rb

Defined Under Namespace

Classes: ClientRegistry, Connection, Logger, MessageQueue, PubSubCoordinator, SubscriptionManager

Constant Summary collapse

DEFAULT_OPTIONS =

Default configuration options

{
  host: 'localhost',
  port: 6379,
  database: 0,
  password: nil,
  pool_size: 5,
  pool_timeout: 5,
  connect_timeout: 1,
  read_timeout: 1,
  write_timeout: 1,
  max_retries: 3,
  retry_delay: 1,
  client_timeout: 60,
  message_ttl: 3600,
  namespace: 'faye'
}.freeze
VERSION =
'1.0.0'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, options = {}) ⇒ Redis

Returns a new instance of Redis.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/faye/redis.rb', line 38

def initialize(server, options = {})
  @server = server
  @options = DEFAULT_OPTIONS.merge(options)
  @logger = Logger.new('Faye::Redis', @options)

  # Initialize components
  @connection = Connection.new(@options)
  @client_registry = ClientRegistry.new(@connection, @options)
  @subscription_manager = SubscriptionManager.new(@connection, @options)
  @message_queue = MessageQueue.new(@connection, @options)
  @pubsub_coordinator = PubSubCoordinator.new(@connection, @options)

  # Set up message routing
  setup_message_routing
end

Instance Attribute Details

#client_registryObject (readonly)

Returns the value of attribute client_registry.



30
31
32
# File 'lib/faye/redis.rb', line 30

def client_registry
  @client_registry
end

#connectionObject (readonly)

Returns the value of attribute connection.



30
31
32
# File 'lib/faye/redis.rb', line 30

def connection
  @connection
end

#message_queueObject (readonly)

Returns the value of attribute message_queue.



30
31
32
# File 'lib/faye/redis.rb', line 30

def message_queue
  @message_queue
end

#optionsObject (readonly)

Returns the value of attribute options.



30
31
32
# File 'lib/faye/redis.rb', line 30

def options
  @options
end

#pubsub_coordinatorObject (readonly)

Returns the value of attribute pubsub_coordinator.



30
31
32
# File 'lib/faye/redis.rb', line 30

def pubsub_coordinator
  @pubsub_coordinator
end

#serverObject (readonly)

Returns the value of attribute server.



30
31
32
# File 'lib/faye/redis.rb', line 30

def server
  @server
end

#subscription_managerObject (readonly)

Returns the value of attribute subscription_manager.



30
31
32
# File 'lib/faye/redis.rb', line 30

def subscription_manager
  @subscription_manager
end

Class Method Details

.create(server, options) ⇒ Object

Factory method to create a new Redis engine instance



34
35
36
# File 'lib/faye/redis.rb', line 34

def self.create(server, options)
  new(server, options)
end

Instance Method Details

#client_exists(client_id, &callback) ⇒ Object

Check if a client exists



74
75
76
# File 'lib/faye/redis.rb', line 74

def client_exists(client_id, &callback)
  @client_registry.exists?(client_id, &callback)
end

#create_client(&callback) ⇒ Object

Create a new client



55
56
57
58
59
60
61
62
63
64
# File 'lib/faye/redis.rb', line 55

def create_client(&callback)
  client_id = generate_client_id
  @client_registry.create(client_id) do |success|
    if success
      callback.call(client_id)
    else
      callback.call(nil)
    end
  end
end

#destroy_client(client_id, &callback) ⇒ Object

Destroy a client



67
68
69
70
71
# File 'lib/faye/redis.rb', line 67

def destroy_client(client_id, &callback)
  @subscription_manager.unsubscribe_all(client_id) do
    @client_registry.destroy(client_id, &callback)
  end
end

#disconnectObject

Disconnect the engine



128
129
130
131
# File 'lib/faye/redis.rb', line 128

def disconnect
  @pubsub_coordinator.disconnect
  @connection.disconnect
end

#empty_queue(client_id) ⇒ Object

Empty a client’s message queue



123
124
125
# File 'lib/faye/redis.rb', line 123

def empty_queue(client_id)
  @message_queue.dequeue_all(client_id)
end

#ping(client_id) ⇒ Object

Ping a client to keep it alive



79
80
81
# File 'lib/faye/redis.rb', line 79

def ping(client_id)
  @client_registry.ping(client_id)
end

#publish(message, channels, &callback) ⇒ Object

Publish a message to channels



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/faye/redis.rb', line 94

def publish(message, channels, &callback)
  channels = [channels] unless channels.is_a?(Array)
  success = true

  begin
    channels.each do |channel|
      # Store message in queues for subscribed clients
      @subscription_manager.get_subscribers(channel) do |client_ids|
        client_ids.each do |client_id|
          @message_queue.enqueue(client_id, message) do |enqueued|
            success &&= enqueued
          end
        end
      end

      # Publish to Redis pub/sub for cross-server routing
      @pubsub_coordinator.publish(channel, message) do |published|
        success &&= published
      end
    end

    EventMachine.next_tick { callback.call(success) } if callback
  rescue => e
    log_error("Failed to publish message to channels #{channels}: #{e.message}")
    EventMachine.next_tick { callback.call(false) } if callback
  end
end

#subscribe(client_id, channel, &callback) ⇒ Object

Subscribe a client to a channel



84
85
86
# File 'lib/faye/redis.rb', line 84

def subscribe(client_id, channel, &callback)
  @subscription_manager.subscribe(client_id, channel, &callback)
end

#unsubscribe(client_id, channel, &callback) ⇒ Object

Unsubscribe a client from a channel



89
90
91
# File 'lib/faye/redis.rb', line 89

def unsubscribe(client_id, channel, &callback)
  @subscription_manager.unsubscribe(client_id, channel, &callback)
end