Class: ActionCable::SubscriptionAdapter::Redis::Listener

Inherits:
SubscriberMap
  • Object
show all
Defined in:
lib/action_cable/subscription_adapter/redis.rb

Instance Method Summary collapse

Methods inherited from SubscriberMap

#add_subscriber, #broadcast, #remove_subscriber

Constructor Details

#initialize(adapter, event_loop) ⇒ Listener

Returns a new instance of Listener.

[View source]

63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/action_cable/subscription_adapter/redis.rb', line 63

def initialize(adapter, event_loop)
  super()

  @adapter = adapter
  @event_loop = event_loop

  @subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
  @subscription_lock = Mutex.new

  @raw_client = nil

  @when_connected = []

  @thread = nil
end

Instance Method Details

#add_channel(channel, on_success) ⇒ Object

[View source]

130
131
132
133
134
135
136
# File 'lib/action_cable/subscription_adapter/redis.rb', line 130

def add_channel(channel, on_success)
  @subscription_lock.synchronize do
    ensure_listener_running
    @subscribe_callbacks[channel] << on_success
    when_connected { send_command("subscribe", channel) }
  end
end

#invoke_callbackObject

[View source]

144
145
146
# File 'lib/action_cable/subscription_adapter/redis.rb', line 144

def invoke_callback(*)
  @event_loop.post { super }
end

#listen(conn) ⇒ Object

[View source]

79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/action_cable/subscription_adapter/redis.rb', line 79

def listen(conn)
  conn.without_reconnect do
    original_client = conn.respond_to?(:_client) ? conn._client : conn.client

    conn.subscribe("_action_cable_internal") do |on|
      on.subscribe do |chan, count|
        @subscription_lock.synchronize do
          if count == 1
            @raw_client = original_client

            until @when_connected.empty?
              @when_connected.shift.call
            end
          end

          if callbacks = @subscribe_callbacks[chan]
            next_callback = callbacks.shift
            @event_loop.post(&next_callback) if next_callback
            @subscribe_callbacks.delete(chan) if callbacks.empty?
          end
        end
      end

      on.message do |chan, message|
        broadcast(chan, message)
      end

      on.unsubscribe do |chan, count|
        if count == 0
          @subscription_lock.synchronize do
            @raw_client = nil
          end
        end
      end
    end
  end
end

#remove_channel(channel) ⇒ Object

[View source]

138
139
140
141
142
# File 'lib/action_cable/subscription_adapter/redis.rb', line 138

def remove_channel(channel)
  @subscription_lock.synchronize do
    when_connected { send_command("unsubscribe", channel) }
  end
end

#shutdownObject

[View source]

117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/action_cable/subscription_adapter/redis.rb', line 117

def shutdown
  @subscription_lock.synchronize do
    return if @thread.nil?

    when_connected do
      send_command("unsubscribe")
      @raw_client = nil
    end
  end

  Thread.pass while @thread.alive?
end