Class: Ably::Realtime::Channel::ChannelManager Private
- Inherits:
-
Object
- Object
- Ably::Realtime::Channel::ChannelManager
- Extended by:
- Forwardable
- Defined in:
- lib/ably/realtime/channel/channel_manager.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
ChannelManager is responsible for all actions relating to channel state: attaching, detaching or failure Channel state changes are performed by this class and executed from ChannelStateMachine
This is a private class and should never be used directly by developers as the API is likely to change in future.
Instance Method Summary collapse
-
#attach ⇒ Object
private
Commence attachment.
-
#attached(attached_protocol_message) ⇒ Object
private
Channel is attached, notify presence if sync is expected.
-
#detach(error, previous_state) ⇒ Object
private
Commence attachment.
-
#detached_received(reason) ⇒ Object
private
Handle DETACED messages, see #RTL13 for server-initated detaches.
- #drop_pending_queue_from_ack(ack_protocol_message) ⇒ Object private
- #duplicate_attached_received(protocol_message) ⇒ Object private
-
#fail_messages_awaiting_ack(error, options = {}) ⇒ Object
private
When continuity on the connection is interrupted or channel becomes suspended (implying loss of continuity) then all messages published but awaiting an ACK from Ably should be failed with a NACK.
- #fail_messages_in_queue(queue, error) ⇒ Object private
-
#fail_queued_messages(error) ⇒ Object
private
When a channel becomes suspended or failed, all queued messages should be failed immediately as we don’t queue in any of those states.
-
#initialize(channel, connection) ⇒ ChannelManager
constructor
private
A new instance of ChannelManager.
-
#log_channel_error(error) ⇒ Object
private
An error has occurred on the channel.
- #nack_message(message, error, protocol_message = nil) ⇒ Object private
- #nack_messages(protocol_message, error) ⇒ Object private
-
#request_reattach(options = {}) ⇒ Object
private
Request channel to be reattached by sending an attach protocol message.
-
#start_attach_from_suspended_timer ⇒ Object
private
If the connection is still connected and the channel still suspended after channel_retry_timeout has passed, then attempt to reattach automatically, see #RTL13b.
Constructor Details
#initialize(channel, connection) ⇒ ChannelManager
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of ChannelManager.
12 13 14 15 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 12 def initialize(channel, connection) @channel = channel @connection = connection end |
Instance Method Details
#attach ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Commence attachment
18 19 20 21 22 23 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 18 def attach if can_transition_to?(:attached) connect_if_connection_initialized end end |
#attached(attached_protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Channel is attached, notify presence if sync is expected
35 36 37 38 39 40 41 42 43 44 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 35 def attached() # If no attached ProtocolMessage then this attached request was triggered by the client # library, such as returning to attached whne detach has failed if update_presence_sync_state_following_attached channel.properties.set_attach_serial(.channel_serial) channel..set_modes_from_flags(.flags) channel..set_params(.params) end end |
#detach(error, previous_state) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Commence attachment
26 27 28 29 30 31 32 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 26 def detach(error, previous_state) if connection.closed? || connection.connecting? || connection.suspended? channel.transition_state_machine :detached, reason: error elsif can_transition_to?(:detached) previous_state end end |
#detached_received(reason) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Handle DETACED messages, see #RTL13 for server-initated detaches
86 87 88 89 90 91 92 93 94 95 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 86 def detached_received(reason) case channel.state.to_sym when :detaching channel.transition_state_machine :detached, reason: reason when :attached, :suspended channel.transition_state_machine :attaching, reason: reason else logger.debug { "ChannelManager: DETACHED ProtocolMessage received, but no action to take as not DETACHING, ATTACHED OR SUSPENDED" } end end |
#drop_pending_queue_from_ack(ack_protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
149 150 151 152 153 154 155 156 157 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 149 def drop_pending_queue_from_ack() = . + .count - 1 connection..drop_while do || if . <= yield true end end end |
#duplicate_attached_received(protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 62 def duplicate_attached_received() if .error channel.set_channel_error_reason .error log_channel_error .error end channel.properties.set_attach_serial(.channel_serial) channel..set_modes_from_flags(.flags) if .has_channel_resumed_flag? logger.debug { "ChannelManager: Additional resumed ATTACHED message received for #{channel.state} channel '#{channel.name}'" } else channel.emit :update, Ably::Models::ChannelStateChange.new( current: channel.state, previous: channel.state, event: Ably::Realtime::Channel::EVENT(:update), reason: .error, resumed: false, ) update_presence_sync_state_following_attached end end |
#fail_messages_awaiting_ack(error, options = {}) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
When continuity on the connection is interrupted or channel becomes suspended (implying loss of continuity) then all messages published but awaiting an ACK from Ably should be failed with a NACK
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 101 def (error, = {}) immediately = [:immediately] || false fail_proc = lambda do error = Ably::Exceptions::MessageDeliveryFailed.new("Continuity of connection was lost so published messages awaiting ACK have failed") unless error connection., error end # Allow a short time for other queued operations to complete before failing all messages if immediately fail_proc.call else EventMachine.add_timer(0.1) { fail_proc.call } end end |
#fail_messages_in_queue(queue, error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
125 126 127 128 129 130 131 132 133 134 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 125 def (queue, error) queue.delete_if do || if .action.match_any?(:presence, :message) if .channel == channel.name , error true end end end end |
#fail_queued_messages(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
When a channel becomes suspended or failed, all queued messages should be failed immediately as we don’t queue in any of those states
120 121 122 123 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 120 def (error) error = Ably::Exceptions::MessageDeliveryFailed.new("Queued messages on channel '#{channel.name}' in state '#{channel.state}' will never be delivered") unless error connection., error end |
#log_channel_error(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
An error has occurred on the channel
47 48 49 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 47 def log_channel_error(error) logger.error { "ChannelManager: Channel '#{channel.name}' error: #{error}" } end |
#nack_message(message, error, protocol_message = nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
144 145 146 147 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 144 def (, error, = nil) logger.debug { "Calling NACK failure callbacks for #{.class.name} - #{.to_json} #{"protocol message: #{}" if }" } .fail error end |
#nack_messages(protocol_message, error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
136 137 138 139 140 141 142 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 136 def (, error) (. + .presence).each do || , error, end logger.debug { "Calling NACK failure callbacks for #{.class.name} - #{.to_json}" } .fail error end |
#request_reattach(options = {}) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Request channel to be reattached by sending an attach protocol message
54 55 56 57 58 59 60 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 54 def request_reattach( = {}) reason = [:reason] logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" } channel.set_channel_error_reason(reason) if reason channel.transition_state_machine! :attaching, reason: reason unless channel.attaching? end |
#start_attach_from_suspended_timer ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
If the connection is still connected and the channel still suspended after channel_retry_timeout has passed, then attempt to reattach automatically, see #RTL13b
161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 161 def start_attach_from_suspended_timer cancel_attach_from_suspended_timer if connection.connected? channel.unsafe_once { |event| cancel_attach_from_suspended_timer unless event == :update } connection.unsafe_once { |event| cancel_attach_from_suspended_timer unless event == :update } @attach_from_suspended_timer = EventMachine::Timer.new(channel_retry_timeout) do channel.transition_state_machine! :attaching end end end |