Class: Faye::Redis

Inherits:
Object
  • Object
show all
Defined in:
lib/faye/redis.rb,
lib/faye/redis/logger.rb,
lib/faye/redis/version.rb,
lib/faye/redis/connection.rb,
lib/faye/redis/message_queue.rb,
lib/faye/redis/client_registry.rb,
lib/faye/redis/pubsub_coordinator.rb,
lib/faye/redis/subscription_manager.rb

Defined Under Namespace

Classes: ClientRegistry, Connection, Logger, MessageQueue, PubSubCoordinator, SubscriptionManager

Constant Summary collapse

DEFAULT_OPTIONS =

Default configuration options

{
  host: 'localhost',
  port: 6379,
  database: 0,
  password: nil,
  pool_size: 5,
  pool_timeout: 5,
  connect_timeout: 1,
  read_timeout: 1,
  write_timeout: 1,
  max_retries: 3,
  retry_delay: 1,
  client_timeout: 60,
  message_ttl: 3600,
  namespace: 'faye',
  gc_interval: 60  # Automatic garbage collection interval (seconds), set to 0 or false to disable
}.freeze
VERSION =
'1.0.6'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, options = {}) ⇒ Redis

Returns a new instance of Redis.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/faye/redis.rb', line 40

def initialize(server, options = {})
  @server = server
  @options = DEFAULT_OPTIONS.merge(options)
  @logger = Logger.new('Faye::Redis', @options)

  # Initialize components
  @connection = Connection.new(@options)
  @client_registry = ClientRegistry.new(@connection, @options)
  @subscription_manager = SubscriptionManager.new(@connection, @options)
  @message_queue = MessageQueue.new(@connection, @options)
  @pubsub_coordinator = PubSubCoordinator.new(@connection, @options)

  # Set up message routing
  setup_message_routing

  # Start automatic garbage collection timer
  start_gc_timer
end

Instance Attribute Details

#client_registryObject (readonly)

Returns the value of attribute client_registry.



32
33
34
# File 'lib/faye/redis.rb', line 32

def client_registry
  @client_registry
end

#connectionObject (readonly)

Returns the value of attribute connection.



32
33
34
# File 'lib/faye/redis.rb', line 32

def connection
  @connection
end

#message_queueObject (readonly)

Returns the value of attribute message_queue.



32
33
34
# File 'lib/faye/redis.rb', line 32

def message_queue
  @message_queue
end

#optionsObject (readonly)

Returns the value of attribute options.



32
33
34
# File 'lib/faye/redis.rb', line 32

def options
  @options
end

#pubsub_coordinatorObject (readonly)

Returns the value of attribute pubsub_coordinator.



32
33
34
# File 'lib/faye/redis.rb', line 32

def pubsub_coordinator
  @pubsub_coordinator
end

#serverObject (readonly)

Returns the value of attribute server.



32
33
34
# File 'lib/faye/redis.rb', line 32

def server
  @server
end

#subscription_managerObject (readonly)

Returns the value of attribute subscription_manager.



32
33
34
# File 'lib/faye/redis.rb', line 32

def subscription_manager
  @subscription_manager
end

Class Method Details

.create(server, options) ⇒ Object

Factory method to create a new Redis engine instance



36
37
38
# File 'lib/faye/redis.rb', line 36

def self.create(server, options)
  new(server, options)
end

Instance Method Details

#cleanup_expired(&callback) ⇒ Object

Clean up expired clients and their associated data



154
155
156
157
158
159
160
161
162
163
164
# File 'lib/faye/redis.rb', line 154

def cleanup_expired(&callback)
  @client_registry.cleanup_expired do |expired_count|
    @logger.info("Cleaned up #{expired_count} expired clients") if expired_count > 0

    # Always clean up orphaned subscription keys (even if no expired clients)
    # This handles cases where subscriptions were orphaned due to crashes
    cleanup_orphaned_subscriptions do
      callback.call(expired_count) if callback
    end
  end
end

#client_exists(client_id, &callback) ⇒ Object

Check if a client exists



84
85
86
# File 'lib/faye/redis.rb', line 84

def client_exists(client_id, &callback)
  @client_registry.exists?(client_id, &callback)
end

#create_client(&callback) ⇒ Object

Create a new client



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/faye/redis.rb', line 60

def create_client(&callback)
  # Ensure GC timer is started (lazy initialization)
  ensure_gc_timer_started

  client_id = generate_client_id
  @client_registry.create(client_id) do |success|
    if success
      callback.call(client_id)
    else
      callback.call(nil)
    end
  end
end

#destroy_client(client_id, &callback) ⇒ Object

Destroy a client



75
76
77
78
79
80
81
# File 'lib/faye/redis.rb', line 75

def destroy_client(client_id, &callback)
  @subscription_manager.unsubscribe_all(client_id) do
    @message_queue.clear(client_id) do
      @client_registry.destroy(client_id, &callback)
    end
  end
end

#disconnectObject

Disconnect the engine



145
146
147
148
149
150
151
# File 'lib/faye/redis.rb', line 145

def disconnect
  # Stop GC timer if running
  stop_gc_timer

  @pubsub_coordinator.disconnect
  @connection.disconnect
end

#empty_queue(client_id) ⇒ Object

Empty a client’s message queue



140
141
142
# File 'lib/faye/redis.rb', line 140

def empty_queue(client_id)
  @message_queue.dequeue_all(client_id)
end

#ping(client_id) ⇒ Object

Ping a client to keep it alive



89
90
91
# File 'lib/faye/redis.rb', line 89

def ping(client_id)
  @client_registry.ping(client_id)
end

#publish(message, channels, &callback) ⇒ Object

Publish a message to channels



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/faye/redis.rb', line 104

def publish(message, channels, &callback)
  channels = [channels] unless channels.is_a?(Array)

  begin
    remaining_operations = channels.size
    success = true

    channels.each do |channel|
      # Get subscribers and process in parallel
      @subscription_manager.get_subscribers(channel) do |client_ids|
        # Immediately publish to pub/sub (don't wait for enqueue)
        @pubsub_coordinator.publish(channel, message) do |published|
          success &&= published
        end

        # Enqueue for all subscribed clients in parallel (batch operation)
        if client_ids.any?
          enqueue_messages_batch(client_ids, message) do |enqueued|
            success &&= enqueued
          end
        end

        # Track completion
        remaining_operations -= 1
        if remaining_operations == 0 && callback
          EventMachine.next_tick { callback.call(success) }
        end
      end
    end
  rescue => e
    log_error("Failed to publish message to channels #{channels}: #{e.message}")
    EventMachine.next_tick { callback.call(false) } if callback
  end
end

#subscribe(client_id, channel, &callback) ⇒ Object

Subscribe a client to a channel



94
95
96
# File 'lib/faye/redis.rb', line 94

def subscribe(client_id, channel, &callback)
  @subscription_manager.subscribe(client_id, channel, &callback)
end

#unsubscribe(client_id, channel, &callback) ⇒ Object

Unsubscribe a client from a channel



99
100
101
# File 'lib/faye/redis.rb', line 99

def unsubscribe(client_id, channel, &callback)
  @subscription_manager.unsubscribe(client_id, channel, &callback)
end