Class: Ably::Realtime::Connection

Inherits:
Object
  • Object
show all
Extended by:
Modules::Enum
Includes:
Modules::Conversions, Modules::EventEmitter, Modules::SafeYield, Modules::StateEmitter, Modules::UsesStateMachine
Defined in:
lib/ably/realtime/connection.rb,
lib/ably/realtime/connection/connection_manager.rb,
lib/ably/realtime/connection/websocket_transport.rb,
lib/ably/realtime/connection/connection_state_machine.rb

Overview

The Connection class represents the connection associated with an Ably Realtime instance. The Connection object exposes the lifecycle and parameters of the realtime connection.

Connections will always be in one of the following states:

initialized:  0
connecting:   1
connected:    2
disconnected: 3
suspended:    4
closing:      5
closed:       6
failed:       7

Note that the states are available as Enum-like constants:

Connection::STATE.Initialized
Connection::STATE.Connecting
Connection::STATE.Connected
Connection::STATE.Disconnected
Connection::STATE.Suspended
Connection::STATE.Closing
Connection::STATE.Closed
Connection::STATE.Failed

Examples:

client = Ably::Realtime::Client.new('key.id:secret')
client.connection.on(:connected) do
  puts "Connected with connection ID: #{client.connection.id}"
end

Defined Under Namespace

Classes: ConnectionManager, ConnectionStateMachine, WebsocketTransport

Constant Summary collapse

STATE =

ConnectionState The permited states for this connection

ruby_enum('STATE',
  :initialized,
  :connecting,
  :connected,
  :disconnected,
  :suspended,
  :closing,
  :closed,
  :failed
)
EVENT =

ConnectionEvent The permitted connection events that are emitted for this connection

ruby_enum('EVENT',
  STATE.to_sym_arr + [:update]
)
RECOVER_REGEX =

Expected format for a connection recover key

/^(?<recover>[^:]+):(?<connection_serial>[^:]+):(?<msg_serial>\-?\d+)$/
DEFAULTS =

Defaults for automatic connection recovery and timeouts

{
  channel_retry_timeout:      15, # when a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED
  disconnected_retry_timeout: 15, # when the connection enters the DISCONNECTED state, after this delay in milliseconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically
  suspended_retry_timeout:    30, # when the connection enters the SUSPENDED state, after this delay in milliseconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically
  connection_state_ttl:       120, # the duration that Ably will persist the connection state when a Realtime client is abruptly disconnected
  max_connection_state_ttl:   nil, # allow a max TTL to be passed in, usually for CI test purposes thus overiding any connection_state_ttl sent from Ably
  realtime_request_timeout:   10,  # default timeout when establishing a connection, or sending a HEARTBEAT, CONNECT, ATTACH, DETACH or CLOSE ProtocolMessage
  websocket_heartbeats_disabled: false,
}.freeze
MAX_PROTOCOL_MESSAGE_BATCH_SIZE =

Max number of messages to bundle in a single ProtocolMessage

50

Instance Attribute Summary collapse

Attributes included from Modules::UsesStateMachine

#previous_state, #state_history

Instance Method Summary collapse

Methods included from Modules::UsesStateMachine

#synchronize_state_with_statemachine, #transition_state_machine, #transition_state_machine!

Methods included from Modules::StateEmitter

#once_or_if, #once_state_changed, #state=, #state?, #unsafe_once_or_if, #unsafe_once_state_changed

Methods included from Modules::EventEmitter

#emit, #off, #on, #once, #unsafe_off, #unsafe_on, #unsafe_once

Constructor Details

#initialize(client, options) ⇒ Connection

Returns a new instance of Connection.



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/ably/realtime/connection.rb', line 136

def initialize(client, options)
  @client                        = client
  @__outgoing_message_queue__    = []
  @__pending_message_ack_queue__ = []

  @defaults = DEFAULTS.dup
  options.each do |key, val|
    @defaults[key] = val if DEFAULTS.has_key?(key)
  end if options.kind_of?(Hash)
  @defaults.freeze

  # If a recover client options is provided, then we need to ensure that the msgSerial matches the
  # recover serial immediately at client library instantiation. This is done immediately so that any queued
  # publishes use the correct serial number for these queued messages as well.
  # There is no harm if the msgSerial is higher than expected if the recover fails.
  recovery_msg_serial = connection_recover_parts && connection_recover_parts[:msg_serial].to_i
  if recovery_msg_serial
    @client_msg_serial = recovery_msg_serial
  else
    reset_client_msg_serial
  end

  Client::IncomingMessageDispatcher.new client, self
  Client::OutgoingMessageDispatcher.new client, self

  @state_machine = ConnectionStateMachine.new(self)
  @state         = STATE(state_machine.current_state)
  @manager       = ConnectionManager.new(self)

  @current_host = client.endpoint.host
end

Instance Attribute Details

#__incoming_protocol_msgbus__Ably::Util::PubSub (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Client library internal incoming protocol message bus.

Returns:



358
359
360
# File 'lib/ably/realtime/connection.rb', line 358

def __incoming_protocol_msgbus__
  @__incoming_protocol_msgbus__ ||= create_pub_sub_message_bus
end

#__outgoing_message_queue__Array (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

An internal queue used to manage unsent outgoing messages. You should never interface with this array directly

Returns:

  • (Array)


122
123
124
# File 'lib/ably/realtime/connection.rb', line 122

def __outgoing_message_queue__
  @__outgoing_message_queue__
end

#__outgoing_protocol_msgbus__Ably::Util::PubSub (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Client library internal outgoing protocol message bus.

Returns:



351
352
353
# File 'lib/ably/realtime/connection.rb', line 351

def __outgoing_protocol_msgbus__
  @__outgoing_protocol_msgbus__ ||= create_pub_sub_message_bus
end

#__pending_message_ack_queue__Array (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

An internal queue used to manage sent messages. You should never interface with this array directly

Returns:

  • (Array)


127
128
129
# File 'lib/ably/realtime/connection.rb', line 127

def __pending_message_ack_queue__
  @__pending_message_ack_queue__
end

#clientAbly::Realtime::Client (readonly)

Ably::Realtime::Client associated with this connection



107
108
109
# File 'lib/ably/realtime/connection.rb', line 107

def client
  @client
end

#current_hostString (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns The current host that is configured following a call to method #determine_host.

Returns:

  • (String)

    The current host that is configured following a call to method #determine_host



385
386
387
# File 'lib/ably/realtime/connection.rb', line 385

def current_host
  @current_host
end

#defaultsHash (readonly)

Configured recovery and timeout defaults for this Ably::Realtime::Connection. See the configurable options in Ably::Realtime::Client#initialize. The defaults are immutable

Returns:

  • (Hash)


133
134
135
# File 'lib/ably/realtime/connection.rb', line 133

def defaults
  @defaults
end

#detailsAbly::Models::ConnectionDetails (readonly)

Connection details of the currently established connection



103
104
105
# File 'lib/ably/realtime/connection.rb', line 103

def details
  @details
end

#error_reasonAbly::Models::ErrorInfo, Ably::Exceptions::BaseAblyException (readonly)

When a connection failure occurs this attribute contains the Ably Exception



99
100
101
# File 'lib/ably/realtime/connection.rb', line 99

def error_reason
  @error_reason
end

#idString (readonly)

A unique public identifier for this connection, used to identify this member in presence events and messages

Returns:

  • (String)


87
88
89
# File 'lib/ably/realtime/connection.rb', line 87

def id
  @id
end

#keyString (readonly)

A unique private connection key used to recover this connection, assigned by Ably

Returns:

  • (String)


91
92
93
# File 'lib/ably/realtime/connection.rb', line 91

def key
  @key
end

#loggerLogger (readonly)

Returns The Logger for this client. Configure the log_level with the `:log_level` option, refer to Ably::Realtime::Client#initialize.

Returns:



396
397
398
# File 'lib/ably/realtime/connection.rb', line 396

def logger
  client.logger
end

#managerAbly::Realtime::Connection::ConnectionManager (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The Connection manager responsible for creating, maintaining and closing the connection and underlying transport



117
118
119
# File 'lib/ably/realtime/connection.rb', line 117

def manager
  @manager
end

#portInteger (readonly)

Returns The default port used for this connection.

Returns:

  • (Integer)

    The default port used for this connection



389
390
391
# File 'lib/ably/realtime/connection.rb', line 389

def port
  client.use_tls? ? client.custom_tls_port || 443 : client.custom_port || 80
end

#recovery_keyString (readonly)

Returns recovery key that can be used by another client to recover this connection with the :recover option.

Returns:

  • (String)

    recovery key that can be used by another client to recover this connection with the :recover option



317
318
319
# File 'lib/ably/realtime/connection.rb', line 317

def recovery_key
  "#{key}:#{serial}:#{client_msg_serial}" if connection_resumable?
end

#serialInteger (readonly)

The serial number of the last message to be received on this connection, used to recover or resume a connection

Returns:

  • (Integer)


95
96
97
# File 'lib/ably/realtime/connection.rb', line 95

def serial
  @serial
end

#stateAbly::Realtime::Connection::STATE (readonly)

Returns connection state.

Returns:



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
# File 'lib/ably/realtime/connection.rb', line 39

class Connection
  include Ably::Modules::EventEmitter
  include Ably::Modules::Conversions
  include Ably::Modules::SafeYield
  extend Ably::Modules::Enum

  # ConnectionState
  # The permited states for this connection
  STATE = ruby_enum('STATE',
    :initialized,
    :connecting,
    :connected,
    :disconnected,
    :suspended,
    :closing,
    :closed,
    :failed
  )

  # ConnectionEvent
  # The permitted connection events that are emitted for this connection
  EVENT = ruby_enum('EVENT',
    STATE.to_sym_arr + [:update]
  )

  include Ably::Modules::StateEmitter
  include Ably::Modules::UsesStateMachine
  ensure_state_machine_emits 'Ably::Models::ConnectionStateChange'

  # Expected format for a connection recover key
  RECOVER_REGEX = /^(?<recover>[^:]+):(?<connection_serial>[^:]+):(?<msg_serial>\-?\d+)$/

  # Defaults for automatic connection recovery and timeouts
  DEFAULTS = {
    channel_retry_timeout:      15, # when a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED
    disconnected_retry_timeout: 15, # when the connection enters the DISCONNECTED state, after this delay in milliseconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically
    suspended_retry_timeout:    30, # when the connection enters the SUSPENDED state, after this delay in milliseconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically
    connection_state_ttl:       120, # the duration that Ably will persist the connection state when a Realtime client is abruptly disconnected
    max_connection_state_ttl:   nil, # allow a max TTL to be passed in, usually for CI test purposes thus overiding any connection_state_ttl sent from Ably
    realtime_request_timeout:   10,  # default timeout when establishing a connection, or sending a HEARTBEAT, CONNECT, ATTACH, DETACH or CLOSE ProtocolMessage
    websocket_heartbeats_disabled: false,
  }.freeze

  # Max number of messages to bundle in a single ProtocolMessage
  MAX_PROTOCOL_MESSAGE_BATCH_SIZE = 50

  # A unique public identifier for this connection, used to identify this member in presence events and messages
  # @return [String]
  attr_reader :id

  # A unique private connection key used to recover this connection, assigned by Ably
  # @return [String]
  attr_reader :key

  # The serial number of the last message to be received on this connection, used to recover or resume a connection
  # @return [Integer]
  attr_reader :serial

  # When a connection failure occurs this attribute contains the Ably Exception
  # @return [Ably::Models::ErrorInfo,Ably::Exceptions::BaseAblyException]
  attr_reader :error_reason

  # Connection details of the currently established connection
  # @return [Ably::Models::ConnectionDetails]
  attr_reader :details

  # {Ably::Realtime::Client} associated with this connection
  # @return [Ably::Realtime::Client]
  attr_reader :client

  # Underlying socket transport used for this connection, for internal use by the client library
  # @return [Ably::Realtime::Connection::WebsocketTransport]
  # @api private
  attr_reader :transport

  # The Connection manager responsible for creating, maintaining and closing the connection and underlying transport
  # @return [Ably::Realtime::Connection::ConnectionManager]
  # @api private
  attr_reader :manager

  # An internal queue used to manage unsent outgoing messages. You should never interface with this array directly
  # @return [Array]
  # @api private
  attr_reader :__outgoing_message_queue__

  # An internal queue used to manage sent messages. You should never interface with this array directly
  # @return [Array]
  # @api private
  attr_reader :__pending_message_ack_queue__

  # Configured recovery and timeout defaults for this {Connection}.
  # See the configurable options in {Ably::Realtime::Client#initialize}.
  # The defaults are immutable
  # @return [Hash]
  attr_reader :defaults

  # @api public
  def initialize(client, options)
    @client                        = client
    @__outgoing_message_queue__    = []
    @__pending_message_ack_queue__ = []

    @defaults = DEFAULTS.dup
    options.each do |key, val|
      @defaults[key] = val if DEFAULTS.has_key?(key)
    end if options.kind_of?(Hash)
    @defaults.freeze

    # If a recover client options is provided, then we need to ensure that the msgSerial matches the
    # recover serial immediately at client library instantiation. This is done immediately so that any queued
    # publishes use the correct serial number for these queued messages as well.
    # There is no harm if the msgSerial is higher than expected if the recover fails.
    recovery_msg_serial = connection_recover_parts && connection_recover_parts[:msg_serial].to_i
    if recovery_msg_serial
      @client_msg_serial = recovery_msg_serial
    else
      reset_client_msg_serial
    end

    Client::IncomingMessageDispatcher.new client, self
    Client::OutgoingMessageDispatcher.new client, self

    @state_machine = ConnectionStateMachine.new(self)
    @state         = STATE(state_machine.current_state)
    @manager       = ConnectionManager.new(self)

    @current_host = client.endpoint.host
  end

  # Causes the connection to close, entering the closed state, from any state except
  # the failed state. Once closed, the library will not attempt to re-establish the
  # connection without a call to {Connection#connect}.
  #
  # @yield block is called as soon as this connection is in the Closed state
  #
  # @return [EventMachine::Deferrable]
  #
  def close(&success_block)
    unless closing? || closed?
      unless can_transition_to?(:closing)
        return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:closing))
      end
      transition_state_machine :closing
    end
    deferrable_for_state_change_to(STATE.Closed, &success_block)
  end

  # Causes the library to attempt connection.  If it was previously explicitly
  # closed by the user, or was closed as a result of an unrecoverable error, a new connection will be opened.
  # Succeeds when connection is established i.e. state is @Connected@
  # Fails when state becomes either @Closing@, @Closed@ or @Failed@
  #
  # Note that if the connection remains in the disconnected ans suspended states indefinitely,
  # the Deferrable or block provided may never be called
  #
  # @yield block is called as soon as this connection is in the Connected state
  #
  # @return [EventMachine::Deferrable]
  #
  def connect(&success_block)
    unless connecting? || connected?
      unless can_transition_to?(:connecting)
        return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:connecting))
      end
      # If connect called in a suspended block, we want to ensure the other callbacks have finished their work first
      EventMachine.next_tick { transition_state_machine :connecting if can_transition_to?(:connecting) }
    end

    Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
      deferrable.callback do
        yield if block_given?
      end
      succeed_callback = deferrable.method(:succeed)
      fail_callback    = deferrable.method(:fail)

      unsafe_once(:connected) do
        deferrable.succeed
        off(&fail_callback)
      end

      unsafe_once(:failed, :closed, :closing) do
        deferrable.fail
        off(&succeed_callback)
      end
    end
  end

  # Sends a ping to Ably and yields the provided block when a heartbeat ping request is echoed from the server.
  # This can be useful for measuring true roundtrip client to Ably server latency for a simple message, or checking that an underlying transport is responding currently.
  # The elapsed time in seconds is passed as an argument to the block and represents the time taken to echo a ping heartbeat once the connection is in the `:connected` state.
  #
  # @yield [Integer] if a block is passed to this method, then this block will be called once the ping heartbeat is received with the time elapsed in seconds.
  #                  If the ping is not received within an acceptable timeframe, the block will be called with +nil+ as he first argument
  #
  # @example
  #    client = Ably::Rest::Client.new(key: 'key.id:secret')
  #    client.connection.ping do |elapsed_s|
  #      puts "Ping took #{elapsed_s}s"
  #    end
  #
  # @return [Ably::Util::SafeDeferrable]
  #
  def ping(&block)
    if initialized? || suspended? || closing? || closed? || failed?
      error = Ably::Models::ErrorInfo.new(message: "Cannot send a ping when the connection is #{state}", code: Ably::Exceptions::Codes::DISCONNECTED)
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end

    Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
      started = nil
      finished = false
      ping_id = SecureRandom.hex(16)
      heartbeat_action = Ably::Models::ProtocolMessage::ACTION.Heartbeat

      wait_for_ping = lambda do |protocol_message|
        next if finished
        if protocol_message.action == heartbeat_action && protocol_message.id == ping_id
          finished = true
          __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
          time_passed = Time.now.to_f - started.to_f
          deferrable.succeed time_passed
          safe_yield block, time_passed if block_given?
        end
      end

      once_or_if(STATE.Connected) do
        next if finished
        started = Time.now
        send_protocol_message action: heartbeat_action.to_i, id: ping_id
        __incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping
      end

      once_or_if([:suspended, :closing, :closed, :failed]) do
        next if finished
        finished = true
        deferrable.fail Ably::Models::ErrorInfo.new(message: "Ping failed as connection has changed state to #{state}", code: Ably::Exceptions::Codes::DISCONNECTED)
      end

      EventMachine.add_timer(defaults.fetch(:realtime_request_timeout)) do
        next if finished
        finished = true
        __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
        error_msg = "Ping timed out after #{defaults.fetch(:realtime_request_timeout)}s"
        logger.warn { error_msg }
        deferrable.fail Ably::Models::ErrorInfo.new(message: error_msg, code: Ably::Exceptions::Codes::TIMEOUT_ERROR)
        safe_yield block, nil if block_given?
      end
    end
  end

  # @yield [Boolean] True if an internet connection check appears to be up following an HTTP request to a reliable CDN
  # @return [EventMachine::Deferrable]
  # @api private
  def internet_up?
    url = "http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}"
    EventMachine::DefaultDeferrable.new.tap do |deferrable|
      EventMachine::HttpRequest.new(url, tls: { verify_peer: true }).get.tap do |http|
        http.errback do
          yield false if block_given?
          deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unable to connect to #{url}", nil, Ably::Exceptions::Codes::CONNECTION_FAILED)
        end
        http.callback do
          EventMachine.next_tick do
            result = http.response_header.status == 200 && http.response.strip == Ably::INTERNET_CHECK.fetch(:ok_text)
            yield result if block_given?
            if result
              deferrable.succeed
            else
              deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unexpected response from #{url} (#{http.response_header.status})", 400, Ably::Exceptions::Codes::BAD_REQUEST)
            end
          end
        end
      end
    end
  end

  # @!attribute [r] recovery_key
  # @return [String] recovery key that can be used by another client to recover this connection with the :recover option
  def recovery_key
    "#{key}:#{serial}:#{client_msg_serial}" if connection_resumable?
  end

  # Following a new connection being made, the connection ID, connection key
  # and connection serial need to match the details provided by the server.
  #
  # @return [void]
  # @api private
  def configure_new(connection_id, connection_key, connection_serial)
    @id            = connection_id
    @key           = connection_key

    update_connection_serial connection_serial
  end

  # Store last received connection serial so that the connection can be resumed from the last known point-in-time
  # @return [void]
  # @api private
  def update_connection_serial(connection_serial)
    @serial = connection_serial
  end

  # Disable automatic resume of a connection
  # @return [void]
  # @api private
  def reset_resume_info
    @key    = nil
    @serial = nil
  end

  # @!attribute [r] __outgoing_protocol_msgbus__
  # @return [Ably::Util::PubSub] Client library internal outgoing protocol message bus
  # @api private
  def __outgoing_protocol_msgbus__
    @__outgoing_protocol_msgbus__ ||= create_pub_sub_message_bus
  end

  # @!attribute [r] __incoming_protocol_msgbus__
  # @return [Ably::Util::PubSub] Client library internal incoming protocol message bus
  # @api private
  def __incoming_protocol_msgbus__
    @__incoming_protocol_msgbus__ ||= create_pub_sub_message_bus
  end

  # Determines the correct host name to use for the next connection attempt and updates current_host
  # @yield [String] The host name used for this connection, for network connection failures a {Ably::FALLBACK_HOSTS fallback host} is used to route around networking or intermittent problems if an Internet connection is available
  # @api private
  def determine_host
    raise ArgumentError, 'Block required' unless block_given?

    if should_use_fallback_hosts?
      internet_up? do |internet_is_up_result|
        @current_host = if internet_is_up_result
          client.fallback_endpoint.host
        else
          client.endpoint.host
        end
        yield current_host
      end
    else
      @current_host = client.endpoint.host
      yield current_host
    end
  end

  # @return [String] The current host that is configured following a call to method {#determine_host}
  # @api private
  attr_reader :current_host

  # @!attribute [r] port
  # @return [Integer] The default port used for this connection
  def port
    client.use_tls? ? client.custom_tls_port || 443 : client.custom_port || 80
  end

  # @!attribute [r] logger
  # @return [Logger] The {Ably::Logger} for this client.
  #                  Configure the log_level with the `:log_level` option, refer to {Ably::Realtime::Client#initialize}
  def logger
    client.logger
  end

  # Add protocol message to the outgoing message queue and notify the dispatcher that a message is
  # ready to be sent
  #
  # @param [Ably::Models::ProtocolMessage] protocol_message
  # @return [void]
  # @api private
  def send_protocol_message(protocol_message)
    add_message_serial_if_ack_required_to(protocol_message) do
      Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |message|
        add_message_to_outgoing_queue message
        notify_message_dispatcher_of_new_message message
        logger.debug { "Connection: Prot msg queued =>: #{message.action} #{message}" }
      end
    end
  end

  # @api private
  def add_message_to_outgoing_queue(protocol_message)
    __outgoing_message_queue__ << protocol_message
  end

  # @api private
  def notify_message_dispatcher_of_new_message(protocol_message)
    __outgoing_protocol_msgbus__.publish :protocol_message, protocol_message
  end

  # @return [EventMachine::Deferrable]
  # @api private
  def create_websocket_transport
    EventMachine::DefaultDeferrable.new.tap do |websocket_deferrable|
      # Getting auth params can be blocking so uses a Deferrable
      client.auth.auth_params.tap do |auth_deferrable|
        auth_deferrable.callback do |auth_params|
          url_params = auth_params.merge(
            'format' =>     client.protocol,
            'echo' =>       client.echo_messages,
            'v' =>          Ably::PROTOCOL_VERSION,
            'agent' =>      client.rest_client.agent
          )

          # Use native websocket heartbeats if possible, but allow Ably protocol heartbeats
          url_params['heartbeats'] = if defaults.fetch(:websocket_heartbeats_disabled)
            'true'
          else
            'false'
          end

          url_params['clientId'] = client.auth.client_id if client.auth.has_client_id?
          url_params.merge!(client.transport_params)

          if connection_resumable?
            url_params.merge! resume: key, connection_serial: serial
            logger.debug { "Resuming connection key #{key} with serial #{serial}" }
          elsif connection_recoverable?
            url_params.merge! recover: connection_recover_parts[:recover], connectionSerial: connection_recover_parts[:connection_serial]
            logger.debug { "Recovering connection with key #{client.recover}" }
            unsafe_once(:connected, :closed, :failed) do
              client.disable_automatic_connection_recovery
            end
          end

          url = URI(client.endpoint).tap do |endpoint|
            endpoint.query = URI.encode_www_form(url_params)
          end

          determine_host do |host|
            # Ensure the hostname matches the fallback host name
            url.hostname = host
            url.port = port

            begin
              logger.debug { "Connection: Opening socket connection to #{host}:#{port}/#{url.path}?#{url.query}" }
              @transport = create_transport(host, port, url) do |websocket_transport|
                websocket_deferrable.succeed websocket_transport
              end
            rescue EventMachine::ConnectionError => error
              websocket_deferrable.fail error
            end
          end
        end

        auth_deferrable.errback do |error|
          websocket_deferrable.fail error
        end
      end
    end
  end

  # @api private
  def release_websocket_transport
    @transport = nil
  end

  # @api private
  def set_failed_connection_error_reason(error)
    @error_reason = error
  end

  # @api private
  def clear_error_reason
    @error_reason = nil
  end

  # @api private
  def set_connection_details(connection_details)
    @details = connection_details
  end

  # Executes registered callbacks for a successful connection resume event
  # @api private
  def trigger_resumed
    resume_callbacks.each(&:call)
  end

  # Provides a simple hook to inject a callback when a connection is successfully resumed
  # @api private
  def on_resume(&callback)
    resume_callbacks << callback
  end

  # Remove a registered connection resume callback
  # @api private
  def off_resume(&callback)
    resume_callbacks.delete(callback)
  end

  # Returns false if messages cannot be published as a result of message queueing being disabled
  # @api private
  def can_publish_messages?
    connected? ||
      ( (initialized? || connecting? || disconnected?) && client.queue_messages )
  end

  # @api private
  def create_transport(host, port, url, &block)
    logger.debug { "Connection: EventMachine connecting to #{host}:#{port} with URL: #{url}" }
    EventMachine.connect(host, port, WebsocketTransport, self, url.to_s, &block)
  end

  # @api private
  def connection_state_ttl
    defaults[:max_connection_state_ttl] || # undocumented max TTL configuration
      (details && details.connection_state_ttl) ||
      defaults.fetch(:connection_state_ttl)
  end

  def connection_state_ttl=(val)
    @connection_state_ttl = val
  end

  # @api private
  def heartbeat_interval
    # See RTN23a
    (details && details.max_idle_interval).to_i +
      defaults.fetch(:realtime_request_timeout)
  end

  # Resets the client message serial (msgSerial) sent to Ably for each new {Ably::Models::ProtocolMessage}
  # (see #client_msg_serial)
  # @api private
  def reset_client_msg_serial
    @client_msg_serial = -1
  end

  # When a hearbeat or any other message from Ably is received
  # we know it's alive, see #RTN23
  # @api private
  def set_connection_confirmed_alive
    @last_liveness_event = Time.now
    manager.reset_liveness_timer
  end

  # @api private
  def time_since_connection_confirmed_alive?
    Time.now.to_i - @last_liveness_event.to_i
  end

  # As we are using a state machine, do not allow change_state to be used
  # #transition_state_machine must be used instead
  private :change_state

  private

  # The client message serial (msgSerial) is incremented for every message that is published that requires an ACK.
  # Note that this is different to the connection serial that contains the last known serial number
  # received from the server.
  #
  # A message serial number does not guarantee a message has been received, only sent.
  # A connection serial guarantees the server has received the message and is thus used for connection recovery and resumes.
  # @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent
  def client_msg_serial
    @client_msg_serial
  end

  def resume_callbacks
    @resume_callbacks ||= []
  end

  def create_pub_sub_message_bus
    Ably::Util::PubSub.new(
      coerce_into: lambda do |event|
        raise KeyError, "Expected :protocol_message, :#{event} is disallowed" unless event == :protocol_message
        :protocol_message
      end
    )
  end

  def add_message_serial_if_ack_required_to(protocol_message)
    if Ably::Models::ProtocolMessage.ack_required?(protocol_message[:action])
      add_message_serial_to(protocol_message) { yield }
    else
      yield
    end
  end

  def add_message_serial_to(protocol_message)
    @client_msg_serial += 1
    protocol_message[:msgSerial] = client_msg_serial
    yield
  rescue StandardError => e
    @client_msg_serial -= 1
    raise e
  end

  # Simply wait until the next EventMachine tick to ensure Connection initialization is complete
  def when_initialized
    EventMachine.next_tick { yield }
  end

  def connection_resumable?
    !key.nil? && !serial.nil? && connection_state_available?
  end

  def connection_state_available?
    return true if connected?

    return false if time_since_connection_confirmed_alive? > connection_state_ttl + details.max_idle_interval

    connected_last = state_history.reverse.find { |connected| connected.fetch(:state) == :connected }
    if connected_last.nil?
      false
    else
      true
    end
  end

  def connection_recoverable?
    connection_recover_parts
  end

  def connection_recover_parts
    client.recover.to_s.match(RECOVER_REGEX)
  end

  def production?
    client.environment.nil? || client.environment == :production
  end

  def custom_port?
    if client.use_tls?
      !!client.custom_tls_port
    else
      !!client.custom_port
    end
  end

  def custom_host?
    !!client.custom_realtime_host
  end

  def should_use_fallback_hosts?
    if client.fallback_hosts && !client.fallback_hosts.empty?
      if connecting? && previous_state && !disconnected_from_connected_state?
        use_fallback_if_disconnected? || use_fallback_if_suspended?
      end
    end
  end

  def disconnected_from_connected_state?
    most_recent_state_changes = state_history.last(3).first(2) # Ignore current state

    # A valid connection was disconnected
    most_recent_state_changes.last.fetch(:state) == Connection::STATE.Disconnected &&
      most_recent_state_changes.first.fetch(:state) == Connection::STATE.Connected
  end

  def use_fallback_if_disconnected?
    second_reconnect_attempt_for(:disconnected, 1)
  end

  def use_fallback_if_suspended?
    second_reconnect_attempt_for(:suspended, 2) # on first suspended state use default Ably host again
  end

  def second_reconnect_attempt_for(state, first_attempt_count)
    previous_state == state && manager.retry_count_for_state(state) >= first_attempt_count
  end
end

#transportAbly::Realtime::Connection::WebsocketTransport (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Underlying socket transport used for this connection, for internal use by the client library



112
113
114
# File 'lib/ably/realtime/connection.rb', line 112

def transport
  @transport
end

Instance Method Details

#add_message_to_outgoing_queue(protocol_message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



417
418
419
# File 'lib/ably/realtime/connection.rb', line 417

def add_message_to_outgoing_queue(protocol_message)
  __outgoing_message_queue__ << protocol_message
end

#can_publish_messages?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns false if messages cannot be published as a result of message queueing being disabled

Returns:

  • (Boolean)


528
529
530
531
# File 'lib/ably/realtime/connection.rb', line 528

def can_publish_messages?
  connected? ||
    ( (initialized? || connecting? || disconnected?) && client.queue_messages )
end

#clear_error_reasonObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



499
500
501
# File 'lib/ably/realtime/connection.rb', line 499

def clear_error_reason
  @error_reason = nil
end

#close { ... } ⇒ EventMachine::Deferrable

Causes the connection to close, entering the closed state, from any state except the failed state. Once closed, the library will not attempt to re-establish the connection without a call to #connect.

Yields:

  • block is called as soon as this connection is in the Closed state

Returns:

  • (EventMachine::Deferrable)


176
177
178
179
180
181
182
183
184
# File 'lib/ably/realtime/connection.rb', line 176

def close(&success_block)
  unless closing? || closed?
    unless can_transition_to?(:closing)
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:closing))
    end
    transition_state_machine :closing
  end
  deferrable_for_state_change_to(STATE.Closed, &success_block)
end

#configure_new(connection_id, connection_key, connection_serial) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Following a new connection being made, the connection ID, connection key and connection serial need to match the details provided by the server.



326
327
328
329
330
331
# File 'lib/ably/realtime/connection.rb', line 326

def configure_new(connection_id, connection_key, connection_serial)
  @id            = connection_id
  @key           = connection_key

  update_connection_serial connection_serial
end

#connect { ... } ⇒ EventMachine::Deferrable

Causes the library to attempt connection. If it was previously explicitly closed by the user, or was closed as a result of an unrecoverable error, a new connection will be opened. Succeeds when connection is established i.e. state is @Connected@ Fails when state becomes either @Closing@, @Closed@ or @Failed@

Note that if the connection remains in the disconnected ans suspended states indefinitely, the Deferrable or block provided may never be called

Yields:

  • block is called as soon as this connection is in the Connected state

Returns:

  • (EventMachine::Deferrable)


198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/ably/realtime/connection.rb', line 198

def connect(&success_block)
  unless connecting? || connected?
    unless can_transition_to?(:connecting)
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:connecting))
    end
    # If connect called in a suspended block, we want to ensure the other callbacks have finished their work first
    EventMachine.next_tick { transition_state_machine :connecting if can_transition_to?(:connecting) }
  end

  Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
    deferrable.callback do
      yield if block_given?
    end
    succeed_callback = deferrable.method(:succeed)
    fail_callback    = deferrable.method(:fail)

    unsafe_once(:connected) do
      deferrable.succeed
      off(&fail_callback)
    end

    unsafe_once(:failed, :closed, :closing) do
      deferrable.fail
      off(&succeed_callback)
    end
  end
end

#connection_state_ttlObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



540
541
542
543
544
# File 'lib/ably/realtime/connection.rb', line 540

def connection_state_ttl
  defaults[:max_connection_state_ttl] || # undocumented max TTL configuration
    (details && details.connection_state_ttl) ||
    defaults.fetch(:connection_state_ttl)
end

#connection_state_ttl=(val) ⇒ Object



546
547
548
# File 'lib/ably/realtime/connection.rb', line 546

def connection_state_ttl=(val)
  @connection_state_ttl = val
end

#create_transport(host, port, url, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



534
535
536
537
# File 'lib/ably/realtime/connection.rb', line 534

def create_transport(host, port, url, &block)
  logger.debug { "Connection: EventMachine connecting to #{host}:#{port} with URL: #{url}" }
  EventMachine.connect(host, port, WebsocketTransport, self, url.to_s, &block)
end

#create_websocket_transportEventMachine::Deferrable

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (EventMachine::Deferrable)


428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
# File 'lib/ably/realtime/connection.rb', line 428

def create_websocket_transport
  EventMachine::DefaultDeferrable.new.tap do |websocket_deferrable|
    # Getting auth params can be blocking so uses a Deferrable
    client.auth.auth_params.tap do |auth_deferrable|
      auth_deferrable.callback do |auth_params|
        url_params = auth_params.merge(
          'format' =>     client.protocol,
          'echo' =>       client.echo_messages,
          'v' =>          Ably::PROTOCOL_VERSION,
          'agent' =>      client.rest_client.agent
        )

        # Use native websocket heartbeats if possible, but allow Ably protocol heartbeats
        url_params['heartbeats'] = if defaults.fetch(:websocket_heartbeats_disabled)
          'true'
        else
          'false'
        end

        url_params['clientId'] = client.auth.client_id if client.auth.has_client_id?
        url_params.merge!(client.transport_params)

        if connection_resumable?
          url_params.merge! resume: key, connection_serial: serial
          logger.debug { "Resuming connection key #{key} with serial #{serial}" }
        elsif connection_recoverable?
          url_params.merge! recover: connection_recover_parts[:recover], connectionSerial: connection_recover_parts[:connection_serial]
          logger.debug { "Recovering connection with key #{client.recover}" }
          unsafe_once(:connected, :closed, :failed) do
            client.disable_automatic_connection_recovery
          end
        end

        url = URI(client.endpoint).tap do |endpoint|
          endpoint.query = URI.encode_www_form(url_params)
        end

        determine_host do |host|
          # Ensure the hostname matches the fallback host name
          url.hostname = host
          url.port = port

          begin
            logger.debug { "Connection: Opening socket connection to #{host}:#{port}/#{url.path}?#{url.query}" }
            @transport = create_transport(host, port, url) do |websocket_transport|
              websocket_deferrable.succeed websocket_transport
            end
          rescue EventMachine::ConnectionError => error
            websocket_deferrable.fail error
          end
        end
      end

      auth_deferrable.errback do |error|
        websocket_deferrable.fail error
      end
    end
  end
end

#determine_host {|String| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Determines the correct host name to use for the next connection attempt and updates current_host

Yields:

  • (String)

    The host name used for this connection, for network connection failures a fallback host is used to route around networking or intermittent problems if an Internet connection is available

Raises:

  • (ArgumentError)


365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
# File 'lib/ably/realtime/connection.rb', line 365

def determine_host
  raise ArgumentError, 'Block required' unless block_given?

  if should_use_fallback_hosts?
    internet_up? do |internet_is_up_result|
      @current_host = if internet_is_up_result
        client.fallback_endpoint.host
      else
        client.endpoint.host
      end
      yield current_host
    end
  else
    @current_host = client.endpoint.host
    yield current_host
  end
end

#heartbeat_intervalObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



551
552
553
554
555
# File 'lib/ably/realtime/connection.rb', line 551

def heartbeat_interval
  # See RTN23a
  (details && details.max_idle_interval).to_i +
    defaults.fetch(:realtime_request_timeout)
end

#internet_up? {|Boolean| ... } ⇒ EventMachine::Deferrable

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Yields:

  • (Boolean)

    True if an internet connection check appears to be up following an HTTP request to a reliable CDN

Returns:

  • (EventMachine::Deferrable)


292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/ably/realtime/connection.rb', line 292

def internet_up?
  url = "http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}"
  EventMachine::DefaultDeferrable.new.tap do |deferrable|
    EventMachine::HttpRequest.new(url, tls: { verify_peer: true }).get.tap do |http|
      http.errback do
        yield false if block_given?
        deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unable to connect to #{url}", nil, Ably::Exceptions::Codes::CONNECTION_FAILED)
      end
      http.callback do
        EventMachine.next_tick do
          result = http.response_header.status == 200 && http.response.strip == Ably::INTERNET_CHECK.fetch(:ok_text)
          yield result if block_given?
          if result
            deferrable.succeed
          else
            deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unexpected response from #{url} (#{http.response_header.status})", 400, Ably::Exceptions::Codes::BAD_REQUEST)
          end
        end
      end
    end
  end
end

#notify_message_dispatcher_of_new_message(protocol_message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



422
423
424
# File 'lib/ably/realtime/connection.rb', line 422

def notify_message_dispatcher_of_new_message(protocol_message)
  __outgoing_protocol_msgbus__.publish :protocol_message, protocol_message
end

#off_resume(&callback) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Remove a registered connection resume callback



522
523
524
# File 'lib/ably/realtime/connection.rb', line 522

def off_resume(&callback)
  resume_callbacks.delete(callback)
end

#on_resume(&callback) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Provides a simple hook to inject a callback when a connection is successfully resumed



516
517
518
# File 'lib/ably/realtime/connection.rb', line 516

def on_resume(&callback)
  resume_callbacks << callback
end

#ping {|Integer| ... } ⇒ Ably::Util::SafeDeferrable

Sends a ping to Ably and yields the provided block when a heartbeat ping request is echoed from the server. This can be useful for measuring true roundtrip client to Ably server latency for a simple message, or checking that an underlying transport is responding currently. The elapsed time in seconds is passed as an argument to the block and represents the time taken to echo a ping heartbeat once the connection is in the `:connected` state.

Examples:

client = Ably::Rest::Client.new(key: 'key.id:secret')
client.connection.ping do |elapsed_s|
  puts "Ping took #{elapsed_s}s"
end

Yields:

  • (Integer)

    if a block is passed to this method, then this block will be called once the ping heartbeat is received with the time elapsed in seconds. If the ping is not received within an acceptable timeframe, the block will be called with nil as he first argument

Returns:



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/ably/realtime/connection.rb', line 241

def ping(&block)
  if initialized? || suspended? || closing? || closed? || failed?
    error = Ably::Models::ErrorInfo.new(message: "Cannot send a ping when the connection is #{state}", code: Ably::Exceptions::Codes::DISCONNECTED)
    return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
  end

  Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
    started = nil
    finished = false
    ping_id = SecureRandom.hex(16)
    heartbeat_action = Ably::Models::ProtocolMessage::ACTION.Heartbeat

    wait_for_ping = lambda do |protocol_message|
      next if finished
      if protocol_message.action == heartbeat_action && protocol_message.id == ping_id
        finished = true
        __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
        time_passed = Time.now.to_f - started.to_f
        deferrable.succeed time_passed
        safe_yield block, time_passed if block_given?
      end
    end

    once_or_if(STATE.Connected) do
      next if finished
      started = Time.now
      send_protocol_message action: heartbeat_action.to_i, id: ping_id
      __incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping
    end

    once_or_if([:suspended, :closing, :closed, :failed]) do
      next if finished
      finished = true
      deferrable.fail Ably::Models::ErrorInfo.new(message: "Ping failed as connection has changed state to #{state}", code: Ably::Exceptions::Codes::DISCONNECTED)
    end

    EventMachine.add_timer(defaults.fetch(:realtime_request_timeout)) do
      next if finished
      finished = true
      __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
      error_msg = "Ping timed out after #{defaults.fetch(:realtime_request_timeout)}s"
      logger.warn { error_msg }
      deferrable.fail Ably::Models::ErrorInfo.new(message: error_msg, code: Ably::Exceptions::Codes::TIMEOUT_ERROR)
      safe_yield block, nil if block_given?
    end
  end
end

#release_websocket_transportObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



489
490
491
# File 'lib/ably/realtime/connection.rb', line 489

def release_websocket_transport
  @transport = nil
end

#reset_client_msg_serialObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Resets the client message serial (msgSerial) sent to Ably for each new Models::ProtocolMessage (see #client_msg_serial)



560
561
562
# File 'lib/ably/realtime/connection.rb', line 560

def reset_client_msg_serial
  @client_msg_serial = -1
end

#reset_resume_infovoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Disable automatic resume of a connection



343
344
345
346
# File 'lib/ably/realtime/connection.rb', line 343

def reset_resume_info
  @key    = nil
  @serial = nil
end

#send_protocol_message(protocol_message) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Add protocol message to the outgoing message queue and notify the dispatcher that a message is ready to be sent

Parameters:



406
407
408
409
410
411
412
413
414
# File 'lib/ably/realtime/connection.rb', line 406

def send_protocol_message(protocol_message)
  add_message_serial_if_ack_required_to(protocol_message) do
    Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |message|
      add_message_to_outgoing_queue message
      notify_message_dispatcher_of_new_message message
      logger.debug { "Connection: Prot msg queued =>: #{message.action} #{message}" }
    end
  end
end

#set_connection_confirmed_aliveObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

When a hearbeat or any other message from Ably is received we know it's alive, see #RTN23



567
568
569
570
# File 'lib/ably/realtime/connection.rb', line 567

def set_connection_confirmed_alive
  @last_liveness_event = Time.now
  manager.reset_liveness_timer
end

#set_connection_details(connection_details) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



504
505
506
# File 'lib/ably/realtime/connection.rb', line 504

def set_connection_details(connection_details)
  @details = connection_details
end

#set_failed_connection_error_reason(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



494
495
496
# File 'lib/ably/realtime/connection.rb', line 494

def set_failed_connection_error_reason(error)
  @error_reason = error
end

#time_since_connection_confirmed_alive?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


573
574
575
# File 'lib/ably/realtime/connection.rb', line 573

def time_since_connection_confirmed_alive?
  Time.now.to_i - @last_liveness_event.to_i
end

#trigger_resumedObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Executes registered callbacks for a successful connection resume event



510
511
512
# File 'lib/ably/realtime/connection.rb', line 510

def trigger_resumed
  resume_callbacks.each(&:call)
end

#update_connection_serial(connection_serial) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Store last received connection serial so that the connection can be resumed from the last known point-in-time



336
337
338
# File 'lib/ably/realtime/connection.rb', line 336

def update_connection_serial(connection_serial)
  @serial = connection_serial
end