Class: ActionCable::SubscriptionAdapter::Redis::Listener
Defined Under Namespace
Classes: SubscribedClient
Instance Method Summary
collapse
#add_subscriber, #broadcast, #remove_subscriber
Constructor Details
#initialize(adapter, config_options, event_loop) ⇒ Listener
Returns a new instance of Listener.
[View source]
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/action_cable/subscription_adapter/redis.rb', line 67
def initialize(adapter, config_options, event_loop)
super()
@adapter = adapter
@event_loop = event_loop
@subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
@subscription_lock = Mutex.new
@reconnect_attempt = 0
@reconnect_attempts = config_options.fetch(:reconnect_attempts, 1)
@reconnect_attempts = Array.new(@reconnect_attempts, 0) if @reconnect_attempts.is_a?(Integer)
@subscribed_client = nil
@when_connected = []
@thread = nil
end
|
Instance Method Details
#add_channel(channel, on_success) ⇒ Object
[View source]
140
141
142
143
144
145
146
|
# File 'lib/action_cable/subscription_adapter/redis.rb', line 140
def add_channel(channel, on_success)
@subscription_lock.synchronize do
ensure_listener_running
@subscribe_callbacks[channel] << on_success
when_connected { @subscribed_client.subscribe(channel) }
end
end
|
#invoke_callback ⇒ Object
[View source]
154
155
156
|
# File 'lib/action_cable/subscription_adapter/redis.rb', line 154
def invoke_callback(*)
@event_loop.post { super }
end
|
#listen(conn) ⇒ Object
[View source]
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
# File 'lib/action_cable/subscription_adapter/redis.rb', line 88
def listen(conn)
conn.without_reconnect do
original_client = (conn)
conn.subscribe("_action_cable_internal") do |on|
on.subscribe do |chan, count|
@subscription_lock.synchronize do
if count == 1
@reconnect_attempt = 0
@subscribed_client = original_client
until @when_connected.empty?
@when_connected.shift.call
end
end
if callbacks = @subscribe_callbacks[chan]
next_callback = callbacks.shift
@event_loop.post(&next_callback) if next_callback
@subscribe_callbacks.delete(chan) if callbacks.empty?
end
end
end
on.message do |chan, message|
broadcast(chan, message)
end
on.unsubscribe do |chan, count|
if count == 0
@subscription_lock.synchronize do
@subscribed_client = nil
end
end
end
end
end
end
|
#remove_channel(channel) ⇒ Object
[View source]
148
149
150
151
152
|
# File 'lib/action_cable/subscription_adapter/redis.rb', line 148
def remove_channel(channel)
@subscription_lock.synchronize do
when_connected { @subscribed_client.unsubscribe(channel) }
end
end
|
#shutdown ⇒ Object
[View source]
127
128
129
130
131
132
133
134
135
136
137
138
|
# File 'lib/action_cable/subscription_adapter/redis.rb', line 127
def shutdown
@subscription_lock.synchronize do
return if @thread.nil?
when_connected do
@subscribed_client.unsubscribe
@subscribed_client = nil
end
end
Thread.pass while @thread.alive?
end
|