Class: Ably::Rest::Channel

Inherits:
Object
  • Object
show all
Includes:
Modules::Conversions
Defined in:
lib/submodules/ably-ruby/lib/ably/rest/channel.rb,
lib/submodules/ably-ruby/lib/ably/rest/channel/push_channel.rb

Overview

The Ably Realtime service organises the traffic within any application into named channels. Channels are the “unit” of message distribution; clients attach to channels to subscribe to messages, and every message broadcast by the service is associated with a unique channel.

Defined Under Namespace

Classes: PushChannel

Constant Summary collapse

IDEMPOTENT_LIBRARY_GENERATED_ID_LENGTH =

See spec RSL1k1

9

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, name, channel_options = {}) ⇒ Channel

Initialize a new Channel object

Parameters:



33
34
35
36
37
38
39
40
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 33

def initialize(client, name, channel_options = {})
  name = (ensure_utf_8 :name, name)

  @options = Ably::Models::ChannelOptions(channel_options)
  @client  = client
  @name    = name
  @push    = PushChannel.new(self)
end

Instance Attribute Details

#clientAbly::Realtime::Client (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Ably client associated with this channel

Returns:

  • (Ably::Realtime::Client)


16
17
18
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 16

def client
  @client
end

#nameString (readonly)

Returns channel name.

Returns:

  • (String)

    channel name



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 10

class Channel
  include Ably::Modules::Conversions

  # Ably client associated with this channel
  # @return [Ably::Realtime::Client]
  # @api private
  attr_reader :client

  attr_reader :name, :options

  # Push channel used for push notification (client-side)
  # @return [Ably::Rest::Channel::PushChannel]
  # @api private
  attr_reader :push

  IDEMPOTENT_LIBRARY_GENERATED_ID_LENGTH = 9 # See spec RSL1k1

  # Initialize a new Channel object
  #
  # @param client [Ably::Rest::Client]
  # @param name [String] The name of the channel
  # @param channel_options [Hash, Ably::Models::ChannelOptions]     A hash of options or a {Ably::Models::ChannelOptions}
  #
  def initialize(client, name, channel_options = {})
    name = (ensure_utf_8 :name, name)

    @options = Ably::Models::ChannelOptions(channel_options)
    @client  = client
    @name    = name
    @push    = PushChannel.new(self)
  end

  # Publish one or more messages to the channel. Five overloaded forms
  # @param name [String, Array<Ably::Models::Message|Hash>, Ably::Models::Message, nil]   The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs, or a single Ably::Model::Message object
  # @param data [String, Array, Hash, nil]   The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument, in which case an optional hash of query parameters
  # @param attributes [Hash, nil]   Optional additional message attributes such as :extras, :id, :client_id or :connection_id, applied when name attribute is nil or a string (Deprecated, will be removed in 2.0 in favour of constructing a Message object)
  # @return [Boolean]  true if the message was published, otherwise false
  #
  # @example
  #   # Publish a single message with (name, data) form
  #   channel.publish 'click', { x: 1, y: 2 }
  #
  #   # Publish a single message with single Hash form
  #   message = { name: 'click', data: { x: 1, y: 2 } }
  #   channel.publish message
  #
  #   # Publish an array of message Hashes form
  #   messages = [
  #     { name: 'click', data: { x: 1, y: 2 } },
  #     { name: 'click', data: { x: 2, y: 3 } }
  #   ]
  #   channel.publish messages
  #
  #   # Publish an array of Ably::Models::Message objects form
  #   messages = [
  #     Ably::Models::Message(name: 'click', data: { x: 1, y: 2 })
  #     Ably::Models::Message(name: 'click', data: { x: 2, y: 3 })
  #   ]
  #   channel.publish messages
  #
  #   # Publish a single Ably::Models::Message object form
  #   message = Ably::Models::Message(name: 'click', data: { x: 1, y: 2 })
  #   channel.publish message
  #
  def publish(name, data = nil, attributes = {})
    qs_params = nil
    qs_params = data if name.kind_of?(Enumerable) || name.kind_of?(Ably::Models::Message)

    messages = build_messages(name, data, attributes) # (RSL1a, RSL1b)

    if messages.sum(&:size) > (max_message_size = client.max_message_size || Ably::Rest::Client::MAX_MESSAGE_SIZE)
      raise Ably::Exceptions::MaxMessageSizeExceeded.new("Maximum message size exceeded #{max_message_size} bytes.")
    end

    payload = messages.map do |message|
      Ably::Models::Message(message.dup).tap do |msg|
        msg.encode client.encoders, options

        next if msg.client_id.nil?
        if msg.client_id == '*'
          raise Ably::Exceptions::IncompatibleClientId.new('Wildcard client_id is reserved and cannot be used when publishing messages')
        end
        unless client.auth.can_assume_client_id?(msg.client_id)
          raise Ably::Exceptions::IncompatibleClientId.new("Cannot publish with client_id '#{msg.client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'")
        end
      end.as_json
    end.tap do |payload|
      if client.idempotent_rest_publishing
        # We cannot mutate for idempotent publishing if one or more messages already has an ID
        if payload.all? { |msg| !msg['id'] }
          # Mutate the JSON to support idempotent publishing where a Message.id does not exist
          idempotent_publish_id = SecureRandom.base64(IDEMPOTENT_LIBRARY_GENERATED_ID_LENGTH)
          payload.each_with_index do |msg, idx|
            msg['id'] = "#{idempotent_publish_id}:#{idx}"
          end
        end
      end
    end

    options = qs_params ? { qs_params: qs_params } : {}
    response = client.post("#{base_path}/publish", payload.length == 1 ? payload.first : payload, options)

    [201, 204].include?(response.status)
  end

  # Return the message   of the channel
  #
  # @param [Hash] options   the options for the message history request
  # @option options [Integer,Time] :start      Ensure earliest time or millisecond since epoch for any messages retrieved is +:start+
  # @option options [Integer,Time] :end        Ensure latest time or millisecond since epoch for any messages retrieved is +:end+
  # @option options [Symbol]       :direction  +:forwards+ or +:backwards+, defaults to +:backwards+
  # @option options [Integer]      :limit      Maximum number of messages to retrieve up to 1,000, defaults to 100
  #
  # @return [Ably::Models::PaginatedResult<Ably::Models::Message>] First {Ably::Models::PaginatedResult page} of {Ably::Models::Message} objects accessible with {Ably::Models::PaginatedResult#items #items}.
  #
  def history(options = {})
    url = "#{base_path}/messages"
    options = {
      :direction => :backwards,
      :limit     => 100
    }.merge(options)

    [:start, :end].each { |option| options[option] = as_since_epoch(options[option]) if options.has_key?(option) }
    raise ArgumentError, ":end must be equal to or after :start" if options[:start] && options[:end] && (options[:start] > options[:end])

    paginated_options = {
      coerce_into: 'Ably::Models::Message',
      async_blocking_operations: options.delete(:async_blocking_operations),
    }

    response = client.get(url, options)

    Ably::Models::PaginatedResult.new(response, url, client, paginated_options) do |message|
      message.tap do |msg|
        decode_message msg
      end
    end
  end

  # Return the {Ably::Rest::Presence} object
  #
  # @return [Ably::Rest::Presence]
  def presence
    @presence ||= Presence.new(client, self)
  end

  # Sets or updates the stored channel options. (#RSL7)
  # @param channel_options [Hash, Ably::Models::ChannelOptions]  A hash of options or a {Ably::Models::ChannelOptions}
  # @return [Ably::Models::ChannelOptions]
  def set_options(channel_options)
    @options = Ably::Models::ChannelOptions(channel_options)
  end
  alias options= set_options

  # Makes GET request for channel details (#RSL8, #RSL8a)
  #
  # @return [Ably::Models::ChannelDetails]
  def status
    Ably::Models::ChannelDetails.new(client.get(base_path).body)
  end

  private

  def base_path
    "/channels/#{URI.encode_www_form_component(name)}"
  end

  def decode_message(message)
    message.decode client.encoders, options
  rescue Ably::Exceptions::CipherError, Ably::Exceptions::EncoderError => e
    client.logger.error { "Decoding Error on channel '#{name}', message event name '#{message.name}'. #{e.class.name}: #{e.message}" }
  end
end

#optionsHash (readonly)

Returns channel options configured for this channel, see #initialize for channel_options.

Returns:

  • (Hash)

    channel options configured for this channel, see #initialize for channel_options



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 10

class Channel
  include Ably::Modules::Conversions

  # Ably client associated with this channel
  # @return [Ably::Realtime::Client]
  # @api private
  attr_reader :client

  attr_reader :name, :options

  # Push channel used for push notification (client-side)
  # @return [Ably::Rest::Channel::PushChannel]
  # @api private
  attr_reader :push

  IDEMPOTENT_LIBRARY_GENERATED_ID_LENGTH = 9 # See spec RSL1k1

  # Initialize a new Channel object
  #
  # @param client [Ably::Rest::Client]
  # @param name [String] The name of the channel
  # @param channel_options [Hash, Ably::Models::ChannelOptions]     A hash of options or a {Ably::Models::ChannelOptions}
  #
  def initialize(client, name, channel_options = {})
    name = (ensure_utf_8 :name, name)

    @options = Ably::Models::ChannelOptions(channel_options)
    @client  = client
    @name    = name
    @push    = PushChannel.new(self)
  end

  # Publish one or more messages to the channel. Five overloaded forms
  # @param name [String, Array<Ably::Models::Message|Hash>, Ably::Models::Message, nil]   The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs, or a single Ably::Model::Message object
  # @param data [String, Array, Hash, nil]   The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument, in which case an optional hash of query parameters
  # @param attributes [Hash, nil]   Optional additional message attributes such as :extras, :id, :client_id or :connection_id, applied when name attribute is nil or a string (Deprecated, will be removed in 2.0 in favour of constructing a Message object)
  # @return [Boolean]  true if the message was published, otherwise false
  #
  # @example
  #   # Publish a single message with (name, data) form
  #   channel.publish 'click', { x: 1, y: 2 }
  #
  #   # Publish a single message with single Hash form
  #   message = { name: 'click', data: { x: 1, y: 2 } }
  #   channel.publish message
  #
  #   # Publish an array of message Hashes form
  #   messages = [
  #     { name: 'click', data: { x: 1, y: 2 } },
  #     { name: 'click', data: { x: 2, y: 3 } }
  #   ]
  #   channel.publish messages
  #
  #   # Publish an array of Ably::Models::Message objects form
  #   messages = [
  #     Ably::Models::Message(name: 'click', data: { x: 1, y: 2 })
  #     Ably::Models::Message(name: 'click', data: { x: 2, y: 3 })
  #   ]
  #   channel.publish messages
  #
  #   # Publish a single Ably::Models::Message object form
  #   message = Ably::Models::Message(name: 'click', data: { x: 1, y: 2 })
  #   channel.publish message
  #
  def publish(name, data = nil, attributes = {})
    qs_params = nil
    qs_params = data if name.kind_of?(Enumerable) || name.kind_of?(Ably::Models::Message)

    messages = build_messages(name, data, attributes) # (RSL1a, RSL1b)

    if messages.sum(&:size) > (max_message_size = client.max_message_size || Ably::Rest::Client::MAX_MESSAGE_SIZE)
      raise Ably::Exceptions::MaxMessageSizeExceeded.new("Maximum message size exceeded #{max_message_size} bytes.")
    end

    payload = messages.map do |message|
      Ably::Models::Message(message.dup).tap do |msg|
        msg.encode client.encoders, options

        next if msg.client_id.nil?
        if msg.client_id == '*'
          raise Ably::Exceptions::IncompatibleClientId.new('Wildcard client_id is reserved and cannot be used when publishing messages')
        end
        unless client.auth.can_assume_client_id?(msg.client_id)
          raise Ably::Exceptions::IncompatibleClientId.new("Cannot publish with client_id '#{msg.client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'")
        end
      end.as_json
    end.tap do |payload|
      if client.idempotent_rest_publishing
        # We cannot mutate for idempotent publishing if one or more messages already has an ID
        if payload.all? { |msg| !msg['id'] }
          # Mutate the JSON to support idempotent publishing where a Message.id does not exist
          idempotent_publish_id = SecureRandom.base64(IDEMPOTENT_LIBRARY_GENERATED_ID_LENGTH)
          payload.each_with_index do |msg, idx|
            msg['id'] = "#{idempotent_publish_id}:#{idx}"
          end
        end
      end
    end

    options = qs_params ? { qs_params: qs_params } : {}
    response = client.post("#{base_path}/publish", payload.length == 1 ? payload.first : payload, options)

    [201, 204].include?(response.status)
  end

  # Return the message   of the channel
  #
  # @param [Hash] options   the options for the message history request
  # @option options [Integer,Time] :start      Ensure earliest time or millisecond since epoch for any messages retrieved is +:start+
  # @option options [Integer,Time] :end        Ensure latest time or millisecond since epoch for any messages retrieved is +:end+
  # @option options [Symbol]       :direction  +:forwards+ or +:backwards+, defaults to +:backwards+
  # @option options [Integer]      :limit      Maximum number of messages to retrieve up to 1,000, defaults to 100
  #
  # @return [Ably::Models::PaginatedResult<Ably::Models::Message>] First {Ably::Models::PaginatedResult page} of {Ably::Models::Message} objects accessible with {Ably::Models::PaginatedResult#items #items}.
  #
  def history(options = {})
    url = "#{base_path}/messages"
    options = {
      :direction => :backwards,
      :limit     => 100
    }.merge(options)

    [:start, :end].each { |option| options[option] = as_since_epoch(options[option]) if options.has_key?(option) }
    raise ArgumentError, ":end must be equal to or after :start" if options[:start] && options[:end] && (options[:start] > options[:end])

    paginated_options = {
      coerce_into: 'Ably::Models::Message',
      async_blocking_operations: options.delete(:async_blocking_operations),
    }

    response = client.get(url, options)

    Ably::Models::PaginatedResult.new(response, url, client, paginated_options) do |message|
      message.tap do |msg|
        decode_message msg
      end
    end
  end

  # Return the {Ably::Rest::Presence} object
  #
  # @return [Ably::Rest::Presence]
  def presence
    @presence ||= Presence.new(client, self)
  end

  # Sets or updates the stored channel options. (#RSL7)
  # @param channel_options [Hash, Ably::Models::ChannelOptions]  A hash of options or a {Ably::Models::ChannelOptions}
  # @return [Ably::Models::ChannelOptions]
  def set_options(channel_options)
    @options = Ably::Models::ChannelOptions(channel_options)
  end
  alias options= set_options

  # Makes GET request for channel details (#RSL8, #RSL8a)
  #
  # @return [Ably::Models::ChannelDetails]
  def status
    Ably::Models::ChannelDetails.new(client.get(base_path).body)
  end

  private

  def base_path
    "/channels/#{URI.encode_www_form_component(name)}"
  end

  def decode_message(message)
    message.decode client.encoders, options
  rescue Ably::Exceptions::CipherError, Ably::Exceptions::EncoderError => e
    client.logger.error { "Decoding Error on channel '#{name}', message event name '#{message.name}'. #{e.class.name}: #{e.message}" }
  end
end

#pushAbly::Rest::Channel::PushChannel (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Push channel used for push notification (client-side)



23
24
25
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 23

def push
  @push
end

Instance Method Details

#history(options = {}) ⇒ Ably::Models::PaginatedResult<Ably::Models::Message>

Return the message of the channel

Parameters:

  • options (Hash) (defaults to: {})

    the options for the message history request

Options Hash (options):

  • :start (Integer, Time)

    Ensure earliest time or millisecond since epoch for any messages retrieved is :start

  • :end (Integer, Time)

    Ensure latest time or millisecond since epoch for any messages retrieved is :end

  • :direction (Symbol)

    :forwards or :backwards, defaults to :backwards

  • :limit (Integer)

    Maximum number of messages to retrieve up to 1,000, defaults to 100

Returns:

Raises:

  • (ArgumentError)


125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 125

def history(options = {})
  url = "#{base_path}/messages"
  options = {
    :direction => :backwards,
    :limit     => 100
  }.merge(options)

  [:start, :end].each { |option| options[option] = as_since_epoch(options[option]) if options.has_key?(option) }
  raise ArgumentError, ":end must be equal to or after :start" if options[:start] && options[:end] && (options[:start] > options[:end])

  paginated_options = {
    coerce_into: 'Ably::Models::Message',
    async_blocking_operations: options.delete(:async_blocking_operations),
  }

  response = client.get(url, options)

  Ably::Models::PaginatedResult.new(response, url, client, paginated_options) do |message|
    message.tap do |msg|
      decode_message msg
    end
  end
end

#presenceAbly::Rest::Presence

Return the Presence object



152
153
154
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 152

def presence
  @presence ||= Presence.new(client, self)
end

#publish(name, data = nil, attributes = {}) ⇒ Boolean

Publish one or more messages to the channel. Five overloaded forms

Examples:

# Publish a single message with (name, data) form
channel.publish 'click', { x: 1, y: 2 }

# Publish a single message with single Hash form
message = { name: 'click', data: { x: 1, y: 2 } }
channel.publish message

# Publish an array of message Hashes form
messages = [
  { name: 'click', data: { x: 1, y: 2 } },
  { name: 'click', data: { x: 2, y: 3 } }
]
channel.publish messages

# Publish an array of Ably::Models::Message objects form
messages = [
  Ably::Models::Message(name: 'click', data: { x: 1, y: 2 })
  Ably::Models::Message(name: 'click', data: { x: 2, y: 3 })
]
channel.publish messages

# Publish a single Ably::Models::Message object form
message = Ably::Models::Message(name: 'click', data: { x: 1, y: 2 })
channel.publish message

Parameters:

  • name (String, Array<Ably::Models::Message|Hash>, Ably::Models::Message, nil)

    The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with :name and :data pairs, or a single Ably::Model::Message object

  • data (String, Array, Hash, nil) (defaults to: nil)

    The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument, in which case an optional hash of query parameters

  • attributes (Hash, nil) (defaults to: {})

    Optional additional message attributes such as :extras, :id, :client_id or :connection_id, applied when name attribute is nil or a string (Deprecated, will be removed in 2.0 in favour of constructing a Message object)

Returns:

  • (Boolean)

    true if the message was published, otherwise false



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 74

def publish(name, data = nil, attributes = {})
  qs_params = nil
  qs_params = data if name.kind_of?(Enumerable) || name.kind_of?(Ably::Models::Message)

  messages = build_messages(name, data, attributes) # (RSL1a, RSL1b)

  if messages.sum(&:size) > (max_message_size = client.max_message_size || Ably::Rest::Client::MAX_MESSAGE_SIZE)
    raise Ably::Exceptions::MaxMessageSizeExceeded.new("Maximum message size exceeded #{max_message_size} bytes.")
  end

  payload = messages.map do |message|
    Ably::Models::Message(message.dup).tap do |msg|
      msg.encode client.encoders, options

      next if msg.client_id.nil?
      if msg.client_id == '*'
        raise Ably::Exceptions::IncompatibleClientId.new('Wildcard client_id is reserved and cannot be used when publishing messages')
      end
      unless client.auth.can_assume_client_id?(msg.client_id)
        raise Ably::Exceptions::IncompatibleClientId.new("Cannot publish with client_id '#{msg.client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'")
      end
    end.as_json
  end.tap do |payload|
    if client.idempotent_rest_publishing
      # We cannot mutate for idempotent publishing if one or more messages already has an ID
      if payload.all? { |msg| !msg['id'] }
        # Mutate the JSON to support idempotent publishing where a Message.id does not exist
        idempotent_publish_id = SecureRandom.base64(IDEMPOTENT_LIBRARY_GENERATED_ID_LENGTH)
        payload.each_with_index do |msg, idx|
          msg['id'] = "#{idempotent_publish_id}:#{idx}"
        end
      end
    end
  end

  options = qs_params ? { qs_params: qs_params } : {}
  response = client.post("#{base_path}/publish", payload.length == 1 ? payload.first : payload, options)

  [201, 204].include?(response.status)
end

#set_options(channel_options) ⇒ Ably::Models::ChannelOptions Also known as: options=

Sets or updates the stored channel options. (#RSL7)

Parameters:

Returns:



159
160
161
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 159

def set_options(channel_options)
  @options = Ably::Models::ChannelOptions(channel_options)
end

#statusAbly::Models::ChannelDetails

Makes GET request for channel details (#RSL8, #RSL8a)



167
168
169
# File 'lib/submodules/ably-ruby/lib/ably/rest/channel.rb', line 167

def status
  Ably::Models::ChannelDetails.new(client.get(base_path).body)
end