Class: Faye::Redis

Inherits:
Object
  • Object
show all
Defined in:
lib/faye/redis.rb,
lib/faye/redis/logger.rb,
lib/faye/redis/version.rb,
lib/faye/redis/connection.rb,
lib/faye/redis/message_queue.rb,
lib/faye/redis/client_registry.rb,
lib/faye/redis/pubsub_coordinator.rb,
lib/faye/redis/subscription_manager.rb

Defined Under Namespace

Classes: ClientRegistry, Connection, Logger, MessageQueue, PubSubCoordinator, SubscriptionManager

Constant Summary collapse

DEFAULT_OPTIONS =

Default configuration options

{
  host: 'localhost',
  port: 6379,
  database: 0,
  password: nil,
  pool_size: 5,
  pool_timeout: 5,
  connect_timeout: 1,
  read_timeout: 1,
  write_timeout: 1,
  max_retries: 3,
  retry_delay: 1,
  client_timeout: 60,
  message_ttl: 3600,
  namespace: 'faye',
  gc_interval: 60  # Automatic garbage collection interval (seconds), set to 0 or false to disable
}.freeze
VERSION =
'1.0.7'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, options = {}) ⇒ Redis

Returns a new instance of Redis.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/faye/redis.rb', line 40

def initialize(server, options = {})
  @server = server
  @options = DEFAULT_OPTIONS.merge(options)
  @logger = Logger.new('Faye::Redis', @options)

  # Initialize components
  @connection = Connection.new(@options)
  @client_registry = ClientRegistry.new(@connection, @options)
  @subscription_manager = SubscriptionManager.new(@connection, @options)
  @message_queue = MessageQueue.new(@connection, @options)
  @pubsub_coordinator = PubSubCoordinator.new(@connection, @options)

  # Set up message routing
  setup_message_routing

  # Start automatic garbage collection timer
  start_gc_timer
end

Instance Attribute Details

#client_registryObject (readonly)

Returns the value of attribute client_registry.



32
33
34
# File 'lib/faye/redis.rb', line 32

def client_registry
  @client_registry
end

#connectionObject (readonly)

Returns the value of attribute connection.



32
33
34
# File 'lib/faye/redis.rb', line 32

def connection
  @connection
end

#message_queueObject (readonly)

Returns the value of attribute message_queue.



32
33
34
# File 'lib/faye/redis.rb', line 32

def message_queue
  @message_queue
end

#optionsObject (readonly)

Returns the value of attribute options.



32
33
34
# File 'lib/faye/redis.rb', line 32

def options
  @options
end

#pubsub_coordinatorObject (readonly)

Returns the value of attribute pubsub_coordinator.



32
33
34
# File 'lib/faye/redis.rb', line 32

def pubsub_coordinator
  @pubsub_coordinator
end

#serverObject (readonly)

Returns the value of attribute server.



32
33
34
# File 'lib/faye/redis.rb', line 32

def server
  @server
end

#subscription_managerObject (readonly)

Returns the value of attribute subscription_manager.



32
33
34
# File 'lib/faye/redis.rb', line 32

def subscription_manager
  @subscription_manager
end

Class Method Details

.create(server, options) ⇒ Object

Factory method to create a new Redis engine instance



36
37
38
# File 'lib/faye/redis.rb', line 36

def self.create(server, options)
  new(server, options)
end

Instance Method Details

#cleanup_expired(&callback) ⇒ Object

Clean up expired clients and their associated data



188
189
190
191
192
193
194
195
196
197
198
# File 'lib/faye/redis.rb', line 188

def cleanup_expired(&callback)
  @client_registry.cleanup_expired do |expired_count|
    @logger.info("Cleaned up #{expired_count} expired clients") if expired_count > 0

    # Always clean up orphaned subscription keys (even if no expired clients)
    # This handles cases where subscriptions were orphaned due to crashes
    cleanup_orphaned_subscriptions do
      callback.call(expired_count) if callback
    end
  end
end

#client_exists(client_id, &callback) ⇒ Object

Check if a client exists



84
85
86
# File 'lib/faye/redis.rb', line 84

def client_exists(client_id, &callback)
  @client_registry.exists?(client_id, &callback)
end

#create_client(&callback) ⇒ Object

Create a new client



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/faye/redis.rb', line 60

def create_client(&callback)
  # Ensure GC timer is started (lazy initialization)
  ensure_gc_timer_started

  client_id = generate_client_id
  @client_registry.create(client_id) do |success|
    if success
      callback.call(client_id)
    else
      callback.call(nil)
    end
  end
end

#destroy_client(client_id, &callback) ⇒ Object

Destroy a client



75
76
77
78
79
80
81
# File 'lib/faye/redis.rb', line 75

def destroy_client(client_id, &callback)
  @subscription_manager.unsubscribe_all(client_id) do
    @message_queue.clear(client_id) do
      @client_registry.destroy(client_id, &callback)
    end
  end
end

#disconnectObject

Disconnect the engine



179
180
181
182
183
184
185
# File 'lib/faye/redis.rb', line 179

def disconnect
  # Stop GC timer if running
  stop_gc_timer

  @pubsub_coordinator.disconnect
  @connection.disconnect
end

#empty_queue(client_id) ⇒ Object

Empty a client’s message queue



174
175
176
# File 'lib/faye/redis.rb', line 174

def empty_queue(client_id)
  @message_queue.dequeue_all(client_id)
end

#ping(client_id) ⇒ Object

Ping a client to keep it alive



89
90
91
# File 'lib/faye/redis.rb', line 89

def ping(client_id)
  @client_registry.ping(client_id)
end

#publish(message, channels, &callback) ⇒ Object

Publish a message to channels



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/faye/redis.rb', line 104

def publish(message, channels, &callback)
  channels = [channels] unless channels.is_a?(Array)

  begin
    # Ensure message has an ID for deduplication
    message = message.dup unless message.frozen?
    message['id'] ||= generate_message_id

    # Track this message as locally published
    if @local_message_ids
      if @local_message_ids_mutex
        @local_message_ids_mutex.synchronize { @local_message_ids.add(message['id']) }
      else
        @local_message_ids.add(message['id'])
      end
    end

    total_channels = channels.size
    completed_channels = 0
    callback_called = false
    all_success = true

    channels.each do |channel|
      # Get subscribers and process in parallel
      @subscription_manager.get_subscribers(channel) do |client_ids|
        # Track operations for this channel
        pending_ops = 2  # pubsub + enqueue
        channel_success = true
        ops_completed = 0

        complete_channel = lambda do
          ops_completed += 1
          if ops_completed == pending_ops
            # This channel is complete
            all_success &&= channel_success
            completed_channels += 1

            # Call final callback when all channels are done
            if completed_channels == total_channels && !callback_called && callback
              callback_called = true
              EventMachine.next_tick { callback.call(all_success) }
            end
          end
        end

        # Publish to pub/sub
        @pubsub_coordinator.publish(channel, message) do |published|
          channel_success &&= published
          complete_channel.call
        end

        # Enqueue for all subscribed clients
        if client_ids.any?
          enqueue_messages_batch(client_ids, message) do |enqueued|
            channel_success &&= enqueued
            complete_channel.call
          end
        else
          # No clients, but still need to complete
          complete_channel.call
        end
      end
    end
  rescue => e
    log_error("Failed to publish message to channels #{channels}: #{e.message}")
    EventMachine.next_tick { callback.call(false) } if callback && !callback_called
  end
end

#subscribe(client_id, channel, &callback) ⇒ Object

Subscribe a client to a channel



94
95
96
# File 'lib/faye/redis.rb', line 94

def subscribe(client_id, channel, &callback)
  @subscription_manager.subscribe(client_id, channel, &callback)
end

#unsubscribe(client_id, channel, &callback) ⇒ Object

Unsubscribe a client from a channel



99
100
101
# File 'lib/faye/redis.rb', line 99

def unsubscribe(client_id, channel, &callback)
  @subscription_manager.unsubscribe(client_id, channel, &callback)
end