Class: Ably::Realtime::Connection
- Inherits:
-
Object
- Object
- Ably::Realtime::Connection
- Extended by:
- Modules::Enum
- Includes:
- Modules::Conversions, Modules::EventEmitter, Modules::SafeYield, Modules::StateEmitter, Modules::UsesStateMachine
- Defined in:
- lib/ably/realtime/connection.rb,
lib/ably/realtime/connection/connection_manager.rb,
lib/ably/realtime/connection/websocket_transport.rb,
lib/ably/realtime/connection/connection_state_machine.rb
Overview
Enables the management of a connection to Ably.
Defined Under Namespace
Classes: ConnectionManager, ConnectionStateMachine, WebsocketTransport
Constant Summary collapse
- STATE =
The current STATE of the connection. Describes the realtime [Connection]Connection object states.
INITIALIZED A connection with this state has been initialized but no connection has yet been attempted. CONNECTING A connection attempt has been initiated. The connecting state is entered as soon as the library
has completed initialization, and is reentered each time connection is re-attempted following disconnection.
CONNECTED A connection exists and is active. DISCONNECTED A temporary failure condition. No current connection exists because there is no network connectivity
or no host is available. The disconnected state is entered if an established connection is dropped, or if a connection attempt was unsuccessful. In the disconnected state the library will periodically attempt to open a new connection (approximately every 15 seconds), anticipating that the connection will be re-established soon and thus connection and channel continuity will be possible. In this state, developers can continue to publish messages as they are automatically placed in a local queue, to be sent as soon as a connection is reestablished. Messages published by other clients while this client is disconnected will be delivered to it upon reconnection, so long as the connection was resumed within 2 minutes. After 2 minutes have elapsed, recovery is no longer possible and the connection will move to the SUSPENDED state.
SUSPENDED A long term failure condition. No current connection exists because there is no network connectivity
or no host is available. The suspended state is entered after a failed connection attempt if there has then been no connection for a period of two minutes. In the suspended state, the library will periodically attempt to open a new connection every 30 seconds. Developers are unable to publish messages in this state. A new connection attempt can also be triggered by an explicit call to {Ably::Realtime::Connection#connect}. Once the connection has been re-established, channels will be automatically re-attached. The client has been disconnected for too long for them to resume from where they left off, so if it wants to catch up on messages published by other clients while it was disconnected, it needs to use the History API.
CLOSING An explicit request by the developer to close the connection has been sent to the Ably service.
If a reply is not received from Ably within a short period of time, the connection is forcibly terminated and the connection state becomes CLOSED.
CLOSED The connection has been explicitly closed by the client. In the closed state, no reconnection attempts
are made automatically by the library, and clients may not publish messages. No connection state is preserved by the service or by the library. A new connection attempt can be triggered by an explicit call to {Ably::Realtime::Connection#connect}, which results in a new connection.
FAILED This state is entered if the client library encounters a failure condition that it cannot recover from.
This may be a fatal connection error received from the Ably service, for example an attempt to connect with an incorrect API key, or a local terminal error, for example the token in use has expired and the library does not have any way to renew it. In the failed state, no reconnection attempts are made automatically by the library, and clients may not publish messages. A new connection attempt can be triggered by an explicit call to {Ably::Realtime::Connection#connect}.
ruby_enum('STATE', :initialized, :connecting, :connected, :disconnected, :suspended, :closing, :closed, :failed )
- EVENT =
Describes the events emitted by a Ably::Realtime::Connection object. An event is either an UPDATE or a STATE.
UPDATE RTN4h An event for changes to connection conditions for which the STATE does not change.
ruby_enum('EVENT', STATE.to_sym_arr + [:update] )
- RECOVER_REGEX =
Expected format for a connection recover key
/^(?<recover>[^:]+):(?<connection_serial>[^:]+):(?<msg_serial>\-?\d+)$/
- DEFAULTS =
Defaults for automatic connection recovery and timeouts
{ channel_retry_timeout: 15, # when a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED disconnected_retry_timeout: 15, # when the connection enters the DISCONNECTED state, after this delay in milliseconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically suspended_retry_timeout: 30, # when the connection enters the SUSPENDED state, after this delay in milliseconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically connection_state_ttl: 120, # the duration that Ably will persist the connection state when a Realtime client is abruptly disconnected max_connection_state_ttl: nil, # allow a max TTL to be passed in, usually for CI test purposes thus overiding any connection_state_ttl sent from Ably realtime_request_timeout: 10, # default timeout when establishing a connection, or sending a HEARTBEAT, CONNECT, ATTACH, DETACH or CLOSE ProtocolMessage websocket_heartbeats_disabled: false, }.freeze
- MAX_PROTOCOL_MESSAGE_BATCH_SIZE =
Max number of messages to bundle in a single ProtocolMessage
50
Instance Attribute Summary collapse
-
#__incoming_protocol_msgbus__ ⇒ Ably::Util::PubSub
readonly
private
Client library internal incoming protocol message bus.
-
#__outgoing_message_queue__ ⇒ Array
readonly
private
An internal queue used to manage unsent outgoing messages.
-
#__outgoing_protocol_msgbus__ ⇒ Ably::Util::PubSub
readonly
private
Client library internal outgoing protocol message bus.
-
#__pending_message_ack_queue__ ⇒ Array
readonly
private
An internal queue used to manage sent messages.
-
#client ⇒ Ably::Realtime::Client
readonly
Client associated with this connection.
-
#current_host ⇒ String
readonly
private
The current host that is configured following a call to method #determine_host.
-
#defaults ⇒ Hash
readonly
Configured recovery and timeout defaults for this Connection.
-
#details ⇒ Ably::Models::ConnectionDetails
readonly
Connection details of the currently established connection.
-
#error_reason ⇒ Ably::Models::ErrorInfo, Ably::Exceptions::BaseAblyException
readonly
An Models::ErrorInfo object describing the last error received if a connection failure occurs.
-
#id ⇒ String
readonly
A unique public identifier for this connection, used to identify this member.
-
#key ⇒ String
readonly
A unique private connection key used to recover or resume a connection, assigned by Ably.
-
#logger ⇒ Logger
readonly
The Logger for this client.
-
#manager ⇒ Ably::Realtime::Connection::ConnectionManager
readonly
private
The Connection manager responsible for creating, maintaining and closing the connection and underlying transport.
-
#port ⇒ Integer
readonly
The default port used for this connection.
-
#serial ⇒ Integer
readonly
The serial number of the last message to be received on this connection, used automatically by the library when recovering or resuming a connection.
-
#transport ⇒ Ably::Realtime::Connection::WebsocketTransport
readonly
private
Underlying socket transport used for this connection, for internal use by the client library.
Attributes included from Modules::UsesStateMachine
#previous_state, #state_history
Instance Method Summary collapse
- #add_message_to_outgoing_queue(protocol_message) ⇒ Object private
-
#can_publish_messages? ⇒ Boolean
private
Returns false if messages cannot be published as a result of message queueing being disabled.
- #clear_error_reason ⇒ Object private
-
#close { ... } ⇒ EventMachine::Deferrable
Causes the connection to close, entering the STATE CLOSING state.
-
#configure_new(connection_id, connection_key, connection_serial) ⇒ void
private
Following a new connection being made, the connection ID, connection key and connection serial need to match the details provided by the server.
-
#connect { ... } ⇒ EventMachine::Deferrable
Explicitly calling connect() is unnecessary unless the autoConnect attribute of the ClientOptions object is false.
- #connection_state_ttl ⇒ Object private
- #connection_state_ttl=(val) ⇒ Object
- #create_transport(host, port, url, &block) ⇒ Object private
- #create_websocket_transport ⇒ EventMachine::Deferrable private
-
#determine_host {|String| ... } ⇒ Object
private
Determines the correct host name to use for the next connection attempt and updates current_host.
- #heartbeat_interval ⇒ Object private
-
#initialize(client, options) ⇒ Connection
constructor
A new instance of Connection.
- #internet_up? {|Boolean| ... } ⇒ EventMachine::Deferrable private
- #notify_message_dispatcher_of_new_message(protocol_message) ⇒ Object private
-
#off_resume(&callback) ⇒ Object
private
Remove a registered connection resume callback.
-
#on_resume(&callback) ⇒ Object
private
Provides a simple hook to inject a callback when a connection is successfully resumed.
-
#ping {|Integer| ... } ⇒ Ably::Util::SafeDeferrable
When connected, sends a heartbeat ping to the Ably server and executes the callback with any error and the response time in milliseconds when a heartbeat ping request is echoed from the server.
-
#recovery_key ⇒ String
The recovery key string can be used by another client to recover this connection’s state in the recover client options property.
- #release_websocket_transport ⇒ Object private
-
#reset_client_msg_serial ⇒ Object
private
Resets the client message serial (msgSerial) sent to Ably for each new Models::ProtocolMessage (see #client_msg_serial).
-
#reset_resume_info ⇒ void
private
Disable automatic resume of a connection.
-
#send_protocol_message(protocol_message) ⇒ void
private
Add protocol message to the outgoing message queue and notify the dispatcher that a message is ready to be sent.
-
#set_connection_confirmed_alive ⇒ Object
private
When a hearbeat or any other message from Ably is received we know it’s alive, see #RTN23.
- #set_connection_details(connection_details) ⇒ Object private
- #set_failed_connection_error_reason(error) ⇒ Object private
- #time_since_connection_confirmed_alive? ⇒ Boolean private
-
#trigger_resumed ⇒ Object
private
Executes registered callbacks for a successful connection resume event.
-
#update_connection_serial(connection_serial) ⇒ void
private
Store last received connection serial so that the connection can be resumed from the last known point-in-time.
Methods included from Modules::UsesStateMachine
#synchronize_state_with_statemachine, #transition_state_machine, #transition_state_machine!
Methods included from Modules::StateEmitter
#once_or_if, #once_state_changed, #state, #state=, #state?, #unsafe_once_or_if, #unsafe_once_state_changed
Methods included from Modules::EventEmitter
#emit, #off, #on, #once, #unsafe_off, #unsafe_on, #unsafe_once
Constructor Details
#initialize(client, options) ⇒ Connection
Returns a new instance of Connection.
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/ably/realtime/connection.rb', line 169 def initialize(client, ) @client = client @__outgoing_message_queue__ = [] @__pending_message_ack_queue__ = [] @defaults = DEFAULTS.dup .each do |key, val| @defaults[key] = val if DEFAULTS.has_key?(key) end if .kind_of?(Hash) @defaults.freeze # If a recover client options is provided, then we need to ensure that the msgSerial matches the # recover serial immediately at client library instantiation. This is done immediately so that any queued # publishes use the correct serial number for these queued messages as well. # There is no harm if the msgSerial is higher than expected if the recover fails. recovery_msg_serial = connection_recover_parts && connection_recover_parts[:msg_serial].to_i if recovery_msg_serial @client_msg_serial = recovery_msg_serial else reset_client_msg_serial end Client::IncomingMessageDispatcher.new client, self Client::OutgoingMessageDispatcher.new client, self @state_machine = ConnectionStateMachine.new(self) @state = STATE(state_machine.current_state) @manager = ConnectionManager.new(self) @current_host = client.endpoint.host end |
Instance Attribute Details
#__incoming_protocol_msgbus__ ⇒ Ably::Util::PubSub (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns Client library internal incoming protocol message bus.
397 398 399 |
# File 'lib/ably/realtime/connection.rb', line 397 def __incoming_protocol_msgbus__ @__incoming_protocol_msgbus__ ||= end |
#__outgoing_message_queue__ ⇒ Array (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
An internal queue used to manage unsent outgoing messages. You should never interface with this array directly
155 156 157 |
# File 'lib/ably/realtime/connection.rb', line 155 def @__outgoing_message_queue__ end |
#__outgoing_protocol_msgbus__ ⇒ Ably::Util::PubSub (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns Client library internal outgoing protocol message bus.
390 391 392 |
# File 'lib/ably/realtime/connection.rb', line 390 def __outgoing_protocol_msgbus__ @__outgoing_protocol_msgbus__ ||= end |
#__pending_message_ack_queue__ ⇒ Array (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
An internal queue used to manage sent messages. You should never interface with this array directly
160 161 162 |
# File 'lib/ably/realtime/connection.rb', line 160 def @__pending_message_ack_queue__ end |
#client ⇒ Ably::Realtime::Client (readonly)
Ably::Realtime::Client associated with this connection
140 141 142 |
# File 'lib/ably/realtime/connection.rb', line 140 def client @client end |
#current_host ⇒ String (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns The current host that is configured following a call to method #determine_host.
424 425 426 |
# File 'lib/ably/realtime/connection.rb', line 424 def current_host @current_host end |
#defaults ⇒ Hash (readonly)
Configured recovery and timeout defaults for this Ably::Realtime::Connection. See the configurable options in Ably::Realtime::Client#initialize. The defaults are immutable
166 167 168 |
# File 'lib/ably/realtime/connection.rb', line 166 def defaults @defaults end |
#details ⇒ Ably::Models::ConnectionDetails (readonly)
Connection details of the currently established connection
136 137 138 |
# File 'lib/ably/realtime/connection.rb', line 136 def details @details end |
#error_reason ⇒ Ably::Models::ErrorInfo, Ably::Exceptions::BaseAblyException (readonly)
An Models::ErrorInfo object describing the last error received if a connection failure occurs.
132 133 134 |
# File 'lib/ably/realtime/connection.rb', line 132 def error_reason @error_reason end |
#id ⇒ String (readonly)
A unique public identifier for this connection, used to identify this member.
103 104 105 |
# File 'lib/ably/realtime/connection.rb', line 103 def id @id end |
#key ⇒ String (readonly)
A unique private connection key used to recover or resume a connection, assigned by Ably. When recovering a connection explicitly, the recoveryKey is used in the recover client options as it contains both the key and the last message serial. This private connection key can also be used by other REST clients to publish on behalf of this client. See the publishing over REST on behalf of a realtime client docs for more info.
114 115 116 |
# File 'lib/ably/realtime/connection.rb', line 114 def key @key end |
#logger ⇒ Logger (readonly)
Returns The Logger for this client. Configure the log_level with the ‘:log_level` option, refer to Ably::Realtime::Client#initialize.
435 436 437 |
# File 'lib/ably/realtime/connection.rb', line 435 def logger client.logger end |
#manager ⇒ Ably::Realtime::Connection::ConnectionManager (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The Connection manager responsible for creating, maintaining and closing the connection and underlying transport
150 151 152 |
# File 'lib/ably/realtime/connection.rb', line 150 def manager @manager end |
#port ⇒ Integer (readonly)
Returns The default port used for this connection.
428 429 430 |
# File 'lib/ably/realtime/connection.rb', line 428 def port client.use_tls? ? client.custom_tls_port || 443 : client.custom_port || 80 end |
#serial ⇒ Integer (readonly)
The serial number of the last message to be received on this connection, used automatically by the library when recovering or resuming a connection. When recovering a connection explicitly, the recoveryKey is used in the recover client options as it contains both the key and the last message serial.
124 125 126 |
# File 'lib/ably/realtime/connection.rb', line 124 def serial @serial end |
#transport ⇒ Ably::Realtime::Connection::WebsocketTransport (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Underlying socket transport used for this connection, for internal use by the client library
145 146 147 |
# File 'lib/ably/realtime/connection.rb', line 145 def transport @transport end |
Instance Method Details
#add_message_to_outgoing_queue(protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
456 457 458 |
# File 'lib/ably/realtime/connection.rb', line 456 def () << end |
#can_publish_messages? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns false if messages cannot be published as a result of message queueing being disabled
567 568 569 570 |
# File 'lib/ably/realtime/connection.rb', line 567 def connected? || ( (initialized? || connecting? || disconnected?) && client. ) end |
#clear_error_reason ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
538 539 540 |
# File 'lib/ably/realtime/connection.rb', line 538 def clear_error_reason @error_reason = nil end |
#close { ... } ⇒ EventMachine::Deferrable
211 212 213 214 215 216 217 218 219 |
# File 'lib/ably/realtime/connection.rb', line 211 def close(&success_block) unless closing? || closed? unless can_transition_to?(:closing) return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:closing)) end transition_state_machine :closing end deferrable_for_state_change_to(STATE.Closed, &success_block) end |
#configure_new(connection_id, connection_key, connection_serial) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Following a new connection being made, the connection ID, connection key and connection serial need to match the details provided by the server.
365 366 367 368 369 370 |
# File 'lib/ably/realtime/connection.rb', line 365 def configure_new(connection_id, connection_key, connection_serial) @id = connection_id @key = connection_key update_connection_serial connection_serial end |
#connect { ... } ⇒ EventMachine::Deferrable
Explicitly calling connect() is unnecessary unless the autoConnect attribute of the ClientOptions object is false. Unless already connected or connecting, this method causes the connection to open, entering the STATE CONNECTING state.
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/ably/realtime/connection.rb', line 231 def connect(&success_block) unless connecting? || connected? unless can_transition_to?(:connecting) return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:connecting)) end # If connect called in a suspended block, we want to ensure the other callbacks have finished their work first EventMachine.next_tick { transition_state_machine :connecting if can_transition_to?(:connecting) } end Ably::Util::SafeDeferrable.new(logger).tap do |deferrable| deferrable.callback do yield if block_given? end succeed_callback = deferrable.method(:succeed) fail_callback = deferrable.method(:fail) unsafe_once(:connected) do deferrable.succeed off(&fail_callback) end unsafe_once(:failed, :closed, :closing) do deferrable.fail off(&succeed_callback) end end end |
#connection_state_ttl ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
579 580 581 582 583 |
# File 'lib/ably/realtime/connection.rb', line 579 def connection_state_ttl defaults[:max_connection_state_ttl] || # undocumented max TTL configuration (details && details.connection_state_ttl) || defaults.fetch(:connection_state_ttl) end |
#connection_state_ttl=(val) ⇒ Object
585 586 587 |
# File 'lib/ably/realtime/connection.rb', line 585 def connection_state_ttl=(val) @connection_state_ttl = val end |
#create_transport(host, port, url, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
573 574 575 576 |
# File 'lib/ably/realtime/connection.rb', line 573 def create_transport(host, port, url, &block) logger.debug { "Connection: EventMachine connecting to #{host}:#{port} with URL: #{url}" } EventMachine.connect(host, port, WebsocketTransport, self, url.to_s, &block) end |
#create_websocket_transport ⇒ EventMachine::Deferrable
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 |
# File 'lib/ably/realtime/connection.rb', line 467 def create_websocket_transport EventMachine::DefaultDeferrable.new.tap do |websocket_deferrable| # Getting auth params can be blocking so uses a Deferrable client.auth.auth_params.tap do |auth_deferrable| auth_deferrable.callback do |auth_params| url_params = auth_params.merge( 'format' => client.protocol, 'echo' => client., 'v' => Ably::PROTOCOL_VERSION, 'agent' => client.rest_client.agent ) # Use native websocket heartbeats if possible, but allow Ably protocol heartbeats url_params['heartbeats'] = if defaults.fetch(:websocket_heartbeats_disabled) 'true' else 'false' end url_params['clientId'] = client.auth.client_id if client.auth.has_client_id? url_params.merge!(client.transport_params) if connection_resumable? url_params.merge! resume: key, connection_serial: serial logger.debug { "Resuming connection key #{key} with serial #{serial}" } elsif connection_recoverable? url_params.merge! recover: connection_recover_parts[:recover], connectionSerial: connection_recover_parts[:connection_serial] logger.debug { "Recovering connection with key #{client.recover}" } unsafe_once(:connected, :closed, :failed) do client.disable_automatic_connection_recovery end end url = URI(client.endpoint).tap do |endpoint| endpoint.query = URI.encode_www_form(url_params) end determine_host do |host| # Ensure the hostname matches the fallback host name url.hostname = host url.port = port begin logger.debug { "Connection: Opening socket connection to #{host}:#{port}/#{url.path}?#{url.query}" } @transport = create_transport(host, port, url) do |websocket_transport| websocket_deferrable.succeed websocket_transport end rescue EventMachine::ConnectionError => error websocket_deferrable.fail error end end end auth_deferrable.errback do |error| websocket_deferrable.fail error end end end end |
#determine_host {|String| ... } ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Determines the correct host name to use for the next connection attempt and updates current_host
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 |
# File 'lib/ably/realtime/connection.rb', line 404 def determine_host raise ArgumentError, 'Block required' unless block_given? if should_use_fallback_hosts? internet_up? do |internet_is_up_result| @current_host = if internet_is_up_result client.fallback_endpoint.host else client.endpoint.host end yield current_host end else @current_host = client.endpoint.host yield current_host end end |
#heartbeat_interval ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
590 591 592 593 594 |
# File 'lib/ably/realtime/connection.rb', line 590 def heartbeat_interval # See RTN23a (details && details.max_idle_interval).to_i + defaults.fetch(:realtime_request_timeout) end |
#internet_up? {|Boolean| ... } ⇒ EventMachine::Deferrable
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 |
# File 'lib/ably/realtime/connection.rb', line 327 def internet_up? url = "http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}" EventMachine::DefaultDeferrable.new.tap do |deferrable| EventMachine::HttpRequest.new(url, tls: { verify_peer: true }).get.tap do |http| http.errback do yield false if block_given? deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unable to connect to #{url}", nil, Ably::Exceptions::Codes::CONNECTION_FAILED) end http.callback do EventMachine.next_tick do result = http.response_header.status == 200 && http.response.strip == Ably::INTERNET_CHECK.fetch(:ok_text) yield result if block_given? if result deferrable.succeed else deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unexpected response from #{url} (#{http.response_header.status})", 400, Ably::Exceptions::Codes::BAD_REQUEST) end end end end end end |
#notify_message_dispatcher_of_new_message(protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
461 462 463 |
# File 'lib/ably/realtime/connection.rb', line 461 def () __outgoing_protocol_msgbus__.publish :protocol_message, end |
#off_resume(&callback) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Remove a registered connection resume callback
561 562 563 |
# File 'lib/ably/realtime/connection.rb', line 561 def off_resume(&callback) resume_callbacks.delete(callback) end |
#on_resume(&callback) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Provides a simple hook to inject a callback when a connection is successfully resumed
555 556 557 |
# File 'lib/ably/realtime/connection.rb', line 555 def on_resume(&callback) resume_callbacks << callback end |
#ping {|Integer| ... } ⇒ Ably::Util::SafeDeferrable
When connected, sends a heartbeat ping to the Ably server and executes the callback with any error and the response time in milliseconds when a heartbeat ping request is echoed from the server. This can be useful for measuring true round-trip latency to the connected Ably server.
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 |
# File 'lib/ably/realtime/connection.rb', line 276 def ping(&block) if initialized? || suspended? || closing? || closed? || failed? error = Ably::Models::ErrorInfo.new(message: "Cannot send a ping when the connection is #{state}", code: Ably::Exceptions::Codes::DISCONNECTED) return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end Ably::Util::SafeDeferrable.new(logger).tap do |deferrable| started = nil finished = false ping_id = SecureRandom.hex(16) heartbeat_action = Ably::Models::ProtocolMessage::ACTION.Heartbeat wait_for_ping = lambda do || next if finished if .action == heartbeat_action && .id == ping_id finished = true __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping) time_passed = Time.now.to_f - started.to_f deferrable.succeed time_passed safe_yield block, time_passed if block_given? end end once_or_if(STATE.Connected) do next if finished started = Time.now action: heartbeat_action.to_i, id: ping_id __incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping end once_or_if([:suspended, :closing, :closed, :failed]) do next if finished finished = true deferrable.fail Ably::Models::ErrorInfo.new(message: "Ping failed as connection has changed state to #{state}", code: Ably::Exceptions::Codes::DISCONNECTED) end EventMachine.add_timer(defaults.fetch(:realtime_request_timeout)) do next if finished finished = true __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping) error_msg = "Ping timed out after #{defaults.fetch(:realtime_request_timeout)}s" logger.warn { error_msg } deferrable.fail Ably::Models::ErrorInfo.new(message: error_msg, code: Ably::Exceptions::Codes::TIMEOUT_ERROR) safe_yield block, nil if block_given? end end end |
#recovery_key ⇒ String
The recovery key string can be used by another client to recover this connection’s state in the recover client options property. See connection state recover options for more information.
356 357 358 |
# File 'lib/ably/realtime/connection.rb', line 356 def recovery_key "#{key}:#{serial}:#{client_msg_serial}" if connection_resumable? end |
#release_websocket_transport ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
528 529 530 |
# File 'lib/ably/realtime/connection.rb', line 528 def release_websocket_transport @transport = nil end |
#reset_client_msg_serial ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Resets the client message serial (msgSerial) sent to Ably for each new Models::ProtocolMessage (see #client_msg_serial)
599 600 601 |
# File 'lib/ably/realtime/connection.rb', line 599 def reset_client_msg_serial @client_msg_serial = -1 end |
#reset_resume_info ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Disable automatic resume of a connection
382 383 384 385 |
# File 'lib/ably/realtime/connection.rb', line 382 def reset_resume_info @key = nil @serial = nil end |
#send_protocol_message(protocol_message) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Add protocol message to the outgoing message queue and notify the dispatcher that a message is ready to be sent
445 446 447 448 449 450 451 452 453 |
# File 'lib/ably/realtime/connection.rb', line 445 def () () do Ably::Models::ProtocolMessage.new(, logger: logger).tap do || logger.debug { "Connection: Prot msg queued =>: #{.action} #{}" } end end end |
#set_connection_confirmed_alive ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
When a hearbeat or any other message from Ably is received we know it’s alive, see #RTN23
606 607 608 609 |
# File 'lib/ably/realtime/connection.rb', line 606 def set_connection_confirmed_alive @last_liveness_event = Time.now manager.reset_liveness_timer end |
#set_connection_details(connection_details) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
543 544 545 |
# File 'lib/ably/realtime/connection.rb', line 543 def set_connection_details(connection_details) @details = connection_details end |
#set_failed_connection_error_reason(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
533 534 535 |
# File 'lib/ably/realtime/connection.rb', line 533 def set_failed_connection_error_reason(error) @error_reason = error end |
#time_since_connection_confirmed_alive? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
612 613 614 |
# File 'lib/ably/realtime/connection.rb', line 612 def time_since_connection_confirmed_alive? Time.now.to_i - @last_liveness_event.to_i end |
#trigger_resumed ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Executes registered callbacks for a successful connection resume event
549 550 551 |
# File 'lib/ably/realtime/connection.rb', line 549 def trigger_resumed resume_callbacks.each(&:call) end |
#update_connection_serial(connection_serial) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Store last received connection serial so that the connection can be resumed from the last known point-in-time
375 376 377 |
# File 'lib/ably/realtime/connection.rb', line 375 def update_connection_serial(connection_serial) @serial = connection_serial end |