Class: Ably::Rest::Channel

Inherits:
Object
  • Object
show all
Includes:
Modules::Conversions
Defined in:
lib/submodules/ably-ruby/lib/ably/rest/channel.rb,
lib/submodules/ably-ruby/lib/ably/rest/channel/push_channel.rb

Overview

The Ably Realtime service organises the traffic within any application into named channels. Channels are the “unit” of message distribution; clients attach to channels to subscribe to messages, and every message broadcast by the service is associated with a unique channel.

Defined Under Namespace

Classes: PushChannel

Constant Summary collapse

IDEMPOTENT_LIBRARY_GENERATED_ID_LENGTH =

See spec RSL1k1

9

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, name, channel_options = {}) ⇒ Channel

Initialize a new Channel object

Parameters:

  • client (Ably::Rest::Client)
  • name (String)

    The name of the channel

  • channel_options (Hash) (defaults to: {})

    Channel options, currently reserved for Encryption options

Options Hash (channel_options):



34
35
36
37
38
39
40
41
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 34

def initialize(client, name, channel_options = {})
  name = (ensure_utf_8 :name, name)

  update_options channel_options
  @client  = client
  @name    = name
  @push    = PushChannel.new(self)
end

Instance Attribute Details

#clientAbly::Realtime::Client (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Ably client associated with this channel

Returns:

  • (Ably::Realtime::Client)


16
17
18
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 16

def client
  @client
end

#nameString (readonly)

Returns channel name.

Returns:

  • (String)

    channel name



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 10

class Channel
  include Ably::Modules::Conversions

  # Ably client associated with this channel
  # @return [Ably::Realtime::Client]
  # @api private
  attr_reader :client

  attr_reader :name, :options

  # Push channel used for push notification (client-side)
  # @return [Ably::Rest::Channel::PushChannel]
  # @api private
  attr_reader :push

  IDEMPOTENT_LIBRARY_GENERATED_ID_LENGTH = 9 # See spec RSL1k1

  # Initialize a new Channel object
  #
  # @param client [Ably::Rest::Client]
  # @param name [String] The name of the channel
  # @param channel_options [Hash] Channel options, currently reserved for Encryption options
  # @option channel_options [Hash,Ably::Models::CipherParams]   :cipher   A hash of options or a {Ably::Models::CipherParams} to configure the encryption. *:key* is required, all other options are optional.  See {Ably::Util::Crypto#initialize} for a list of +:cipher+ options
  #
  def initialize(client, name, channel_options = {})
    name = (ensure_utf_8 :name, name)

    update_options channel_options
    @client  = client
    @name    = name
    @push    = PushChannel.new(self)
  end

  # Publish one or more messages to the channel. Three overloaded forms
  # @param name [String, Array<Ably::Models::Message|Hash>, Ably::Models::Message, nil]   The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs, or a single Ably::Model::Message object
  # @param data [String, ByteArray, Hash, nil]   The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument, in which case an optional hash of query parameters
  # @param attributes [Hash, nil]   Optional additional message attributes such as :extras, :id, :client_id or :connection_id, applied when name attribute is nil or a string (Deprecated, will be removed in 2.0 in favour of constructing a Message object)
  # @return [Boolean]  true if the message was published, otherwise false
  #
  # @example
  #   # Publish a single message with (name, data) form
  #   channel.publish 'click', { x: 1, y: 2 }
  #
  #   # Publish an array of message Hashes
  #   messages = [
  #     { name: 'click', data: { x: 1, y: 2 } },
  #     { name: 'click', data: { x: 2, y: 3 } }
  #   ]
  #   channel.publish messages
  #
  #   # Publish an array of Ably::Models::Message objects
  #   messages = [
  #     Ably::Models::Message(name: 'click', { x: 1, y: 2 })
  #     Ably::Models::Message(name: 'click', { x: 2, y: 3 })
  #   ]
  #   channel.publish messages
  #
  #   # Publish a single Ably::Models::Message object, with a query params
  #   # specifying quickAck: true
  #   message = Ably::Models::Message(name: 'click', { x: 1, y: 2 })
  #   channel.publish message, quickAck: 'true'
  #
  def publish(first, second = nil, third = {})
    messages, qs_params = if first.kind_of?(Enumerable)
      # ([Message], qs_params) form
      [first, second]
    elsif first.kind_of?(Ably::Models::Message)
      # (Message, qs_params) form
      [[first], second]
    else
      # (name, data, attributes) form
      first = ensure_utf_8(:name, first, allow_nil: true)
      ensure_supported_payload second
      # RSL1h - attributes as an extra method parameter is extra-spec but need to
      # keep it for backcompat until version 2
      [[{ name: first, data: second }.merge(third)], nil]
    end

    messages.map! { |message| Ably::Models::Message(message.dup) }

    if messages.sum(&:size) > (max_message_size = client.max_message_size || Ably::Rest::Client::MAX_MESSAGE_SIZE)
      raise Ably::Exceptions::MaxMessageSizeExceeded.new("Maximum message size exceeded #{max_message_size} bytes.")
    end

    payload = messages.map do |message|
      Ably::Models::Message(message.dup).tap do |msg|
        msg.encode client.encoders, options

        next if msg.client_id.nil?
        if msg.client_id == '*'
          raise Ably::Exceptions::IncompatibleClientId.new('Wildcard client_id is reserved and cannot be used when publishing messages')
        end
        unless client.auth.can_assume_client_id?(msg.client_id)
          raise Ably::Exceptions::IncompatibleClientId.new("Cannot publish with client_id '#{msg.client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'")
        end
      end.as_json
    end.tap do |payload|
      if client.idempotent_rest_publishing
        # We cannot mutate for idempotent publishing if one or more messages already has an ID
        if payload.all? { |msg| !msg['id'] }
          # Mutate the JSON to support idempotent publishing where a Message.id does not exist
          idempotent_publish_id = SecureRandom.base64(IDEMPOTENT_LIBRARY_GENERATED_ID_LENGTH)
          payload.each_with_index do |msg, idx|
            msg['id'] = "#{idempotent_publish_id}:#{idx}"
          end
        end
      end
    end

    options = qs_params ? { qs_params: qs_params } : {}
    response = client.post("#{base_path}/publish", payload.length == 1 ? payload.first : payload, options)

    [201, 204].include?(response.status)
  end

  # Return the message   of the channel
  #
  # @param [Hash] options   the options for the message history request
  # @option options [Integer,Time] :start      Ensure earliest time or millisecond since epoch for any messages retrieved is +:start+
  # @option options [Integer,Time] :end        Ensure latest time or millisecond since epoch for any messages retrieved is +:end+
  # @option options [Symbol]       :direction  +:forwards+ or +:backwards+, defaults to +:backwards+
  # @option options [Integer]      :limit      Maximum number of messages to retrieve up to 1,000, defaults to 100
  #
  # @return [Ably::Models::PaginatedResult<Ably::Models::Message>] First {Ably::Models::PaginatedResult page} of {Ably::Models::Message} objects accessible with {Ably::Models::PaginatedResult#items #items}.
  #
  def history(options = {})
    url = "#{base_path}/messages"
    options = {
      :direction => :backwards,
      :limit     => 100
    }.merge(options)

    [:start, :end].each { |option| options[option] = as_since_epoch(options[option]) if options.has_key?(option) }
    raise ArgumentError, ":end must be equal to or after :start" if options[:start] && options[:end] && (options[:start] > options[:end])

    paginated_options = {
      coerce_into: 'Ably::Models::Message',
      async_blocking_operations: options.delete(:async_blocking_operations),
    }

    response = client.get(url, options)

    Ably::Models::PaginatedResult.new(response, url, client, paginated_options) do |message|
      message.tap do |msg|
        decode_message msg
      end
    end
  end

  # Return the {Ably::Rest::Presence} object
  #
  # @return [Ably::Rest::Presence]
  def presence
    @presence ||= Presence.new(client, self)
  end

  # @api private
  def update_options(channel_options)
    @options = channel_options.clone.freeze
  end
  alias set_options update_options # (RSL7)
  alias options= update_options

  private
  def base_path
    "/channels/#{URI.encode_www_form_component(name)}"
  end

  def decode_message(message)
    message.decode client.encoders, options
  rescue Ably::Exceptions::CipherError, Ably::Exceptions::EncoderError => e
    client.logger.error { "Decoding Error on channel '#{name}', message event name '#{message.name}'. #{e.class.name}: #{e.message}" }
  end
end

#optionsHash (readonly)

Returns channel options configured for this channel, see #initialize for channel_options.

Returns:

  • (Hash)

    channel options configured for this channel, see #initialize for channel_options



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 10

class Channel
  include Ably::Modules::Conversions

  # Ably client associated with this channel
  # @return [Ably::Realtime::Client]
  # @api private
  attr_reader :client

  attr_reader :name, :options

  # Push channel used for push notification (client-side)
  # @return [Ably::Rest::Channel::PushChannel]
  # @api private
  attr_reader :push

  IDEMPOTENT_LIBRARY_GENERATED_ID_LENGTH = 9 # See spec RSL1k1

  # Initialize a new Channel object
  #
  # @param client [Ably::Rest::Client]
  # @param name [String] The name of the channel
  # @param channel_options [Hash] Channel options, currently reserved for Encryption options
  # @option channel_options [Hash,Ably::Models::CipherParams]   :cipher   A hash of options or a {Ably::Models::CipherParams} to configure the encryption. *:key* is required, all other options are optional.  See {Ably::Util::Crypto#initialize} for a list of +:cipher+ options
  #
  def initialize(client, name, channel_options = {})
    name = (ensure_utf_8 :name, name)

    update_options channel_options
    @client  = client
    @name    = name
    @push    = PushChannel.new(self)
  end

  # Publish one or more messages to the channel. Three overloaded forms
  # @param name [String, Array<Ably::Models::Message|Hash>, Ably::Models::Message, nil]   The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs, or a single Ably::Model::Message object
  # @param data [String, ByteArray, Hash, nil]   The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument, in which case an optional hash of query parameters
  # @param attributes [Hash, nil]   Optional additional message attributes such as :extras, :id, :client_id or :connection_id, applied when name attribute is nil or a string (Deprecated, will be removed in 2.0 in favour of constructing a Message object)
  # @return [Boolean]  true if the message was published, otherwise false
  #
  # @example
  #   # Publish a single message with (name, data) form
  #   channel.publish 'click', { x: 1, y: 2 }
  #
  #   # Publish an array of message Hashes
  #   messages = [
  #     { name: 'click', data: { x: 1, y: 2 } },
  #     { name: 'click', data: { x: 2, y: 3 } }
  #   ]
  #   channel.publish messages
  #
  #   # Publish an array of Ably::Models::Message objects
  #   messages = [
  #     Ably::Models::Message(name: 'click', { x: 1, y: 2 })
  #     Ably::Models::Message(name: 'click', { x: 2, y: 3 })
  #   ]
  #   channel.publish messages
  #
  #   # Publish a single Ably::Models::Message object, with a query params
  #   # specifying quickAck: true
  #   message = Ably::Models::Message(name: 'click', { x: 1, y: 2 })
  #   channel.publish message, quickAck: 'true'
  #
  def publish(first, second = nil, third = {})
    messages, qs_params = if first.kind_of?(Enumerable)
      # ([Message], qs_params) form
      [first, second]
    elsif first.kind_of?(Ably::Models::Message)
      # (Message, qs_params) form
      [[first], second]
    else
      # (name, data, attributes) form
      first = ensure_utf_8(:name, first, allow_nil: true)
      ensure_supported_payload second
      # RSL1h - attributes as an extra method parameter is extra-spec but need to
      # keep it for backcompat until version 2
      [[{ name: first, data: second }.merge(third)], nil]
    end

    messages.map! { |message| Ably::Models::Message(message.dup) }

    if messages.sum(&:size) > (max_message_size = client.max_message_size || Ably::Rest::Client::MAX_MESSAGE_SIZE)
      raise Ably::Exceptions::MaxMessageSizeExceeded.new("Maximum message size exceeded #{max_message_size} bytes.")
    end

    payload = messages.map do |message|
      Ably::Models::Message(message.dup).tap do |msg|
        msg.encode client.encoders, options

        next if msg.client_id.nil?
        if msg.client_id == '*'
          raise Ably::Exceptions::IncompatibleClientId.new('Wildcard client_id is reserved and cannot be used when publishing messages')
        end
        unless client.auth.can_assume_client_id?(msg.client_id)
          raise Ably::Exceptions::IncompatibleClientId.new("Cannot publish with client_id '#{msg.client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'")
        end
      end.as_json
    end.tap do |payload|
      if client.idempotent_rest_publishing
        # We cannot mutate for idempotent publishing if one or more messages already has an ID
        if payload.all? { |msg| !msg['id'] }
          # Mutate the JSON to support idempotent publishing where a Message.id does not exist
          idempotent_publish_id = SecureRandom.base64(IDEMPOTENT_LIBRARY_GENERATED_ID_LENGTH)
          payload.each_with_index do |msg, idx|
            msg['id'] = "#{idempotent_publish_id}:#{idx}"
          end
        end
      end
    end

    options = qs_params ? { qs_params: qs_params } : {}
    response = client.post("#{base_path}/publish", payload.length == 1 ? payload.first : payload, options)

    [201, 204].include?(response.status)
  end

  # Return the message   of the channel
  #
  # @param [Hash] options   the options for the message history request
  # @option options [Integer,Time] :start      Ensure earliest time or millisecond since epoch for any messages retrieved is +:start+
  # @option options [Integer,Time] :end        Ensure latest time or millisecond since epoch for any messages retrieved is +:end+
  # @option options [Symbol]       :direction  +:forwards+ or +:backwards+, defaults to +:backwards+
  # @option options [Integer]      :limit      Maximum number of messages to retrieve up to 1,000, defaults to 100
  #
  # @return [Ably::Models::PaginatedResult<Ably::Models::Message>] First {Ably::Models::PaginatedResult page} of {Ably::Models::Message} objects accessible with {Ably::Models::PaginatedResult#items #items}.
  #
  def history(options = {})
    url = "#{base_path}/messages"
    options = {
      :direction => :backwards,
      :limit     => 100
    }.merge(options)

    [:start, :end].each { |option| options[option] = as_since_epoch(options[option]) if options.has_key?(option) }
    raise ArgumentError, ":end must be equal to or after :start" if options[:start] && options[:end] && (options[:start] > options[:end])

    paginated_options = {
      coerce_into: 'Ably::Models::Message',
      async_blocking_operations: options.delete(:async_blocking_operations),
    }

    response = client.get(url, options)

    Ably::Models::PaginatedResult.new(response, url, client, paginated_options) do |message|
      message.tap do |msg|
        decode_message msg
      end
    end
  end

  # Return the {Ably::Rest::Presence} object
  #
  # @return [Ably::Rest::Presence]
  def presence
    @presence ||= Presence.new(client, self)
  end

  # @api private
  def update_options(channel_options)
    @options = channel_options.clone.freeze
  end
  alias set_options update_options # (RSL7)
  alias options= update_options

  private
  def base_path
    "/channels/#{URI.encode_www_form_component(name)}"
  end

  def decode_message(message)
    message.decode client.encoders, options
  rescue Ably::Exceptions::CipherError, Ably::Exceptions::EncoderError => e
    client.logger.error { "Decoding Error on channel '#{name}', message event name '#{message.name}'. #{e.class.name}: #{e.message}" }
  end
end

#pushAbly::Rest::Channel::PushChannel (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Push channel used for push notification (client-side)



23
24
25
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 23

def push
  @push
end

Instance Method Details

#history(options = {}) ⇒ Ably::Models::PaginatedResult<Ably::Models::Message>

Return the message of the channel

Parameters:

  • options (Hash) (defaults to: {})

    the options for the message history request

Options Hash (options):

  • :start (Integer, Time)

    Ensure earliest time or millisecond since epoch for any messages retrieved is :start

  • :end (Integer, Time)

    Ensure latest time or millisecond since epoch for any messages retrieved is :end

  • :direction (Symbol)

    :forwards or :backwards, defaults to :backwards

  • :limit (Integer)

    Maximum number of messages to retrieve up to 1,000, defaults to 100

Returns:

Raises:

  • (ArgumentError)


135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 135

def history(options = {})
  url = "#{base_path}/messages"
  options = {
    :direction => :backwards,
    :limit     => 100
  }.merge(options)

  [:start, :end].each { |option| options[option] = as_since_epoch(options[option]) if options.has_key?(option) }
  raise ArgumentError, ":end must be equal to or after :start" if options[:start] && options[:end] && (options[:start] > options[:end])

  paginated_options = {
    coerce_into: 'Ably::Models::Message',
    async_blocking_operations: options.delete(:async_blocking_operations),
  }

  response = client.get(url, options)

  Ably::Models::PaginatedResult.new(response, url, client, paginated_options) do |message|
    message.tap do |msg|
      decode_message msg
    end
  end
end

#presenceAbly::Rest::Presence

Return the Presence object



162
163
164
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 162

def presence
  @presence ||= Presence.new(client, self)
end

#publish(first, second = nil, third = {}) ⇒ Boolean

Publish one or more messages to the channel. Three overloaded forms

Examples:

# Publish a single message with (name, data) form
channel.publish 'click', { x: 1, y: 2 }

# Publish an array of message Hashes
messages = [
  { name: 'click', data: { x: 1, y: 2 } },
  { name: 'click', data: { x: 2, y: 3 } }
]
channel.publish messages

# Publish an array of Ably::Models::Message objects
messages = [
  Ably::Models::Message(name: 'click', { x: 1, y: 2 })
  Ably::Models::Message(name: 'click', { x: 2, y: 3 })
]
channel.publish messages

# Publish a single Ably::Models::Message object, with a query params
# specifying quickAck: true
message = Ably::Models::Message(name: 'click', { x: 1, y: 2 })
channel.publish message, quickAck: 'true'

Parameters:

  • name (String, Array<Ably::Models::Message|Hash>, Ably::Models::Message, nil)

    The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with :name and :data pairs, or a single Ably::Model::Message object

  • data (String, ByteArray, Hash, nil)

    The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument, in which case an optional hash of query parameters

  • attributes (Hash, nil)

    Optional additional message attributes such as :extras, :id, :client_id or :connection_id, applied when name attribute is nil or a string (Deprecated, will be removed in 2.0 in favour of constructing a Message object)

Returns:

  • (Boolean)

    true if the message was published, otherwise false



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 72

def publish(first, second = nil, third = {})
  messages, qs_params = if first.kind_of?(Enumerable)
    # ([Message], qs_params) form
    [first, second]
  elsif first.kind_of?(Ably::Models::Message)
    # (Message, qs_params) form
    [[first], second]
  else
    # (name, data, attributes) form
    first = ensure_utf_8(:name, first, allow_nil: true)
    ensure_supported_payload second
    # RSL1h - attributes as an extra method parameter is extra-spec but need to
    # keep it for backcompat until version 2
    [[{ name: first, data: second }.merge(third)], nil]
  end

  messages.map! { |message| Ably::Models::Message(message.dup) }

  if messages.sum(&:size) > (max_message_size = client.max_message_size || Ably::Rest::Client::MAX_MESSAGE_SIZE)
    raise Ably::Exceptions::MaxMessageSizeExceeded.new("Maximum message size exceeded #{max_message_size} bytes.")
  end

  payload = messages.map do |message|
    Ably::Models::Message(message.dup).tap do |msg|
      msg.encode client.encoders, options

      next if msg.client_id.nil?
      if msg.client_id == '*'
        raise Ably::Exceptions::IncompatibleClientId.new('Wildcard client_id is reserved and cannot be used when publishing messages')
      end
      unless client.auth.can_assume_client_id?(msg.client_id)
        raise Ably::Exceptions::IncompatibleClientId.new("Cannot publish with client_id '#{msg.client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'")
      end
    end.as_json
  end.tap do |payload|
    if client.idempotent_rest_publishing
      # We cannot mutate for idempotent publishing if one or more messages already has an ID
      if payload.all? { |msg| !msg['id'] }
        # Mutate the JSON to support idempotent publishing where a Message.id does not exist
        idempotent_publish_id = SecureRandom.base64(IDEMPOTENT_LIBRARY_GENERATED_ID_LENGTH)
        payload.each_with_index do |msg, idx|
          msg['id'] = "#{idempotent_publish_id}:#{idx}"
        end
      end
    end
  end

  options = qs_params ? { qs_params: qs_params } : {}
  response = client.post("#{base_path}/publish", payload.length == 1 ? payload.first : payload, options)

  [201, 204].include?(response.status)
end

#update_options(channel_options) ⇒ Object Also known as: set_options, options=

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



167
168
169
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 167

def update_options(channel_options)
  @options = channel_options.clone.freeze
end