Class: Faye::Redis

Inherits:
Object
  • Object
show all
Defined in:
lib/faye/redis.rb,
lib/faye/redis/logger.rb,
lib/faye/redis/version.rb,
lib/faye/redis/connection.rb,
lib/faye/redis/message_queue.rb,
lib/faye/redis/client_registry.rb,
lib/faye/redis/pubsub_coordinator.rb,
lib/faye/redis/subscription_manager.rb

Defined Under Namespace

Classes: ClientRegistry, Connection, Logger, MessageQueue, PubSubCoordinator, SubscriptionManager

Constant Summary collapse

DEFAULT_OPTIONS =

Default configuration options

{
  host: 'localhost',
  port: 6379,
  database: 0,
  password: nil,
  pool_size: 5,
  pool_timeout: 5,
  connect_timeout: 1,
  read_timeout: 1,
  write_timeout: 1,
  max_retries: 3,
  retry_delay: 1,
  client_timeout: 60,
  message_ttl: 3600,
  namespace: 'faye'
}.freeze
VERSION =
'1.0.5'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, options = {}) ⇒ Redis

Returns a new instance of Redis.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/faye/redis.rb', line 39

def initialize(server, options = {})
  @server = server
  @options = DEFAULT_OPTIONS.merge(options)
  @logger = Logger.new('Faye::Redis', @options)

  # Initialize components
  @connection = Connection.new(@options)
  @client_registry = ClientRegistry.new(@connection, @options)
  @subscription_manager = SubscriptionManager.new(@connection, @options)
  @message_queue = MessageQueue.new(@connection, @options)
  @pubsub_coordinator = PubSubCoordinator.new(@connection, @options)

  # Set up message routing
  setup_message_routing
end

Instance Attribute Details

#client_registryObject (readonly)

Returns the value of attribute client_registry.



31
32
33
# File 'lib/faye/redis.rb', line 31

def client_registry
  @client_registry
end

#connectionObject (readonly)

Returns the value of attribute connection.



31
32
33
# File 'lib/faye/redis.rb', line 31

def connection
  @connection
end

#message_queueObject (readonly)

Returns the value of attribute message_queue.



31
32
33
# File 'lib/faye/redis.rb', line 31

def message_queue
  @message_queue
end

#optionsObject (readonly)

Returns the value of attribute options.



31
32
33
# File 'lib/faye/redis.rb', line 31

def options
  @options
end

#pubsub_coordinatorObject (readonly)

Returns the value of attribute pubsub_coordinator.



31
32
33
# File 'lib/faye/redis.rb', line 31

def pubsub_coordinator
  @pubsub_coordinator
end

#serverObject (readonly)

Returns the value of attribute server.



31
32
33
# File 'lib/faye/redis.rb', line 31

def server
  @server
end

#subscription_managerObject (readonly)

Returns the value of attribute subscription_manager.



31
32
33
# File 'lib/faye/redis.rb', line 31

def subscription_manager
  @subscription_manager
end

Class Method Details

.create(server, options) ⇒ Object

Factory method to create a new Redis engine instance



35
36
37
# File 'lib/faye/redis.rb', line 35

def self.create(server, options)
  new(server, options)
end

Instance Method Details

#cleanup_expired(&callback) ⇒ Object

Clean up expired clients and their associated data



144
145
146
147
148
149
150
151
152
153
154
# File 'lib/faye/redis.rb', line 144

def cleanup_expired(&callback)
  @client_registry.cleanup_expired do |expired_count|
    @logger.info("Cleaned up #{expired_count} expired clients") if expired_count > 0

    # Always clean up orphaned subscription keys (even if no expired clients)
    # This handles cases where subscriptions were orphaned due to crashes
    cleanup_orphaned_subscriptions do
      callback.call(expired_count) if callback
    end
  end
end

#client_exists(client_id, &callback) ⇒ Object

Check if a client exists



77
78
79
# File 'lib/faye/redis.rb', line 77

def client_exists(client_id, &callback)
  @client_registry.exists?(client_id, &callback)
end

#create_client(&callback) ⇒ Object

Create a new client



56
57
58
59
60
61
62
63
64
65
# File 'lib/faye/redis.rb', line 56

def create_client(&callback)
  client_id = generate_client_id
  @client_registry.create(client_id) do |success|
    if success
      callback.call(client_id)
    else
      callback.call(nil)
    end
  end
end

#destroy_client(client_id, &callback) ⇒ Object

Destroy a client



68
69
70
71
72
73
74
# File 'lib/faye/redis.rb', line 68

def destroy_client(client_id, &callback)
  @subscription_manager.unsubscribe_all(client_id) do
    @message_queue.clear(client_id) do
      @client_registry.destroy(client_id, &callback)
    end
  end
end

#disconnectObject

Disconnect the engine



138
139
140
141
# File 'lib/faye/redis.rb', line 138

def disconnect
  @pubsub_coordinator.disconnect
  @connection.disconnect
end

#empty_queue(client_id) ⇒ Object

Empty a client’s message queue



133
134
135
# File 'lib/faye/redis.rb', line 133

def empty_queue(client_id)
  @message_queue.dequeue_all(client_id)
end

#ping(client_id) ⇒ Object

Ping a client to keep it alive



82
83
84
# File 'lib/faye/redis.rb', line 82

def ping(client_id)
  @client_registry.ping(client_id)
end

#publish(message, channels, &callback) ⇒ Object

Publish a message to channels



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/faye/redis.rb', line 97

def publish(message, channels, &callback)
  channels = [channels] unless channels.is_a?(Array)

  begin
    remaining_operations = channels.size
    success = true

    channels.each do |channel|
      # Get subscribers and process in parallel
      @subscription_manager.get_subscribers(channel) do |client_ids|
        # Immediately publish to pub/sub (don't wait for enqueue)
        @pubsub_coordinator.publish(channel, message) do |published|
          success &&= published
        end

        # Enqueue for all subscribed clients in parallel (batch operation)
        if client_ids.any?
          enqueue_messages_batch(client_ids, message) do |enqueued|
            success &&= enqueued
          end
        end

        # Track completion
        remaining_operations -= 1
        if remaining_operations == 0 && callback
          EventMachine.next_tick { callback.call(success) }
        end
      end
    end
  rescue => e
    log_error("Failed to publish message to channels #{channels}: #{e.message}")
    EventMachine.next_tick { callback.call(false) } if callback
  end
end

#subscribe(client_id, channel, &callback) ⇒ Object

Subscribe a client to a channel



87
88
89
# File 'lib/faye/redis.rb', line 87

def subscribe(client_id, channel, &callback)
  @subscription_manager.subscribe(client_id, channel, &callback)
end

#unsubscribe(client_id, channel, &callback) ⇒ Object

Unsubscribe a client from a channel



92
93
94
# File 'lib/faye/redis.rb', line 92

def unsubscribe(client_id, channel, &callback)
  @subscription_manager.unsubscribe(client_id, channel, &callback)
end