Class: Faye::Redis

Inherits:
Object
  • Object
show all
Defined in:
lib/faye/redis.rb,
lib/faye/redis/logger.rb,
lib/faye/redis/version.rb,
lib/faye/redis/connection.rb,
lib/faye/redis/message_queue.rb,
lib/faye/redis/client_registry.rb,
lib/faye/redis/pubsub_coordinator.rb,
lib/faye/redis/subscription_manager.rb

Defined Under Namespace

Classes: ClientRegistry, Connection, Logger, MessageQueue, PubSubCoordinator, SubscriptionManager

Constant Summary collapse

DEFAULT_OPTIONS =

Default configuration options

{
  host: 'localhost',
  port: 6379,
  database: 0,
  password: nil,
  pool_size: 5,
  pool_timeout: 5,
  connect_timeout: 1,
  read_timeout: 1,
  write_timeout: 1,
  max_retries: 3,
  retry_delay: 1,
  client_timeout: 60,
  message_ttl: 3600,
  namespace: 'faye'
}.freeze
VERSION =
'1.0.4'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, options = {}) ⇒ Redis

Returns a new instance of Redis.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/faye/redis.rb', line 38

def initialize(server, options = {})
  @server = server
  @options = DEFAULT_OPTIONS.merge(options)
  @logger = Logger.new('Faye::Redis', @options)

  # Initialize components
  @connection = Connection.new(@options)
  @client_registry = ClientRegistry.new(@connection, @options)
  @subscription_manager = SubscriptionManager.new(@connection, @options)
  @message_queue = MessageQueue.new(@connection, @options)
  @pubsub_coordinator = PubSubCoordinator.new(@connection, @options)

  # Set up message routing
  setup_message_routing
end

Instance Attribute Details

#client_registryObject (readonly)

Returns the value of attribute client_registry.



30
31
32
# File 'lib/faye/redis.rb', line 30

def client_registry
  @client_registry
end

#connectionObject (readonly)

Returns the value of attribute connection.



30
31
32
# File 'lib/faye/redis.rb', line 30

def connection
  @connection
end

#message_queueObject (readonly)

Returns the value of attribute message_queue.



30
31
32
# File 'lib/faye/redis.rb', line 30

def message_queue
  @message_queue
end

#optionsObject (readonly)

Returns the value of attribute options.



30
31
32
# File 'lib/faye/redis.rb', line 30

def options
  @options
end

#pubsub_coordinatorObject (readonly)

Returns the value of attribute pubsub_coordinator.



30
31
32
# File 'lib/faye/redis.rb', line 30

def pubsub_coordinator
  @pubsub_coordinator
end

#serverObject (readonly)

Returns the value of attribute server.



30
31
32
# File 'lib/faye/redis.rb', line 30

def server
  @server
end

#subscription_managerObject (readonly)

Returns the value of attribute subscription_manager.



30
31
32
# File 'lib/faye/redis.rb', line 30

def subscription_manager
  @subscription_manager
end

Class Method Details

.create(server, options) ⇒ Object

Factory method to create a new Redis engine instance



34
35
36
# File 'lib/faye/redis.rb', line 34

def self.create(server, options)
  new(server, options)
end

Instance Method Details

#client_exists(client_id, &callback) ⇒ Object

Check if a client exists



76
77
78
# File 'lib/faye/redis.rb', line 76

def client_exists(client_id, &callback)
  @client_registry.exists?(client_id, &callback)
end

#create_client(&callback) ⇒ Object

Create a new client



55
56
57
58
59
60
61
62
63
64
# File 'lib/faye/redis.rb', line 55

def create_client(&callback)
  client_id = generate_client_id
  @client_registry.create(client_id) do |success|
    if success
      callback.call(client_id)
    else
      callback.call(nil)
    end
  end
end

#destroy_client(client_id, &callback) ⇒ Object

Destroy a client



67
68
69
70
71
72
73
# File 'lib/faye/redis.rb', line 67

def destroy_client(client_id, &callback)
  @subscription_manager.unsubscribe_all(client_id) do
    @message_queue.clear(client_id) do
      @client_registry.destroy(client_id, &callback)
    end
  end
end

#disconnectObject

Disconnect the engine



137
138
139
140
# File 'lib/faye/redis.rb', line 137

def disconnect
  @pubsub_coordinator.disconnect
  @connection.disconnect
end

#empty_queue(client_id) ⇒ Object

Empty a client’s message queue



132
133
134
# File 'lib/faye/redis.rb', line 132

def empty_queue(client_id)
  @message_queue.dequeue_all(client_id)
end

#ping(client_id) ⇒ Object

Ping a client to keep it alive



81
82
83
# File 'lib/faye/redis.rb', line 81

def ping(client_id)
  @client_registry.ping(client_id)
end

#publish(message, channels, &callback) ⇒ Object

Publish a message to channels



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/faye/redis.rb', line 96

def publish(message, channels, &callback)
  channels = [channels] unless channels.is_a?(Array)

  begin
    remaining_operations = channels.size
    success = true

    channels.each do |channel|
      # Get subscribers and process in parallel
      @subscription_manager.get_subscribers(channel) do |client_ids|
        # Immediately publish to pub/sub (don't wait for enqueue)
        @pubsub_coordinator.publish(channel, message) do |published|
          success &&= published
        end

        # Enqueue for all subscribed clients in parallel (batch operation)
        if client_ids.any?
          enqueue_messages_batch(client_ids, message) do |enqueued|
            success &&= enqueued
          end
        end

        # Track completion
        remaining_operations -= 1
        if remaining_operations == 0 && callback
          EventMachine.next_tick { callback.call(success) }
        end
      end
    end
  rescue => e
    log_error("Failed to publish message to channels #{channels}: #{e.message}")
    EventMachine.next_tick { callback.call(false) } if callback
  end
end

#subscribe(client_id, channel, &callback) ⇒ Object

Subscribe a client to a channel



86
87
88
# File 'lib/faye/redis.rb', line 86

def subscribe(client_id, channel, &callback)
  @subscription_manager.subscribe(client_id, channel, &callback)
end

#unsubscribe(client_id, channel, &callback) ⇒ Object

Unsubscribe a client from a channel



91
92
93
# File 'lib/faye/redis.rb', line 91

def unsubscribe(client_id, channel, &callback)
  @subscription_manager.unsubscribe(client_id, channel, &callback)
end