Class: Google::Cloud::PubSub::ReceivedMessage

Inherits:
Object
  • Object
show all
Defined in:
lib/google/cloud/pubsub/received_message.rb

Overview

ReceivedMessage

Represents a Pub/Sub Message that can be acknowledged or delayed.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

subscriber = pubsub.subscriber "my-topic-sub"
listener = subscriber.listen do |received_message|
  puts received_message.message.data
  received_message.acknowledge!
end

# Start background threads that will call the block passed to listen.
listener.start

# Shut down the subscriber when ready to stop receiving messages.
listener.stop!

Instance Method Summary collapse

Instance Method Details

#ack_idObject

The acknowledgment ID for the message.



62
63
64
# File 'lib/google/cloud/pubsub/received_message.rb', line 62

def ack_id
  @grpc.ack_id
end

#acknowledge! {|callback| ... } ⇒ Object Also known as: ack!

Acknowledges receipt of the message.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

subscriber = pubsub.subscriber "my-topic-sub"
listener = subscriber.listen do |received_message|
  puts received_message.message.data

  received_message.acknowledge! do |result|
    puts result.status
  end
end

# Start background threads that will call block passed to listen.
listener.start

# Shut down the subscriber when ready to stop receiving messages.
listener.stop!
require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

subscriber = pubsub.subscriber "my-topic-sub"
listener = subscriber.listen do |received_message|
  puts received_message.message.data

  received_message.acknowledge!
end

# Start background threads that will call block passed to listen.
listener.start

# Shut down the subscriber when ready to stop receiving messages.
listener.stop!

Yields:

  • (callback)

    The block to be called when reject operation is done.

Yield Parameters:



210
211
212
213
214
215
216
217
218
# File 'lib/google/cloud/pubsub/received_message.rb', line 210

def acknowledge!(&)
  ensure_subscription!
  if subscription.respond_to?(:exactly_once_delivery_enabled) && subscription.exactly_once_delivery_enabled
    subscription.acknowledge ack_id, &
  else
    subscription.acknowledge ack_id
    yield AcknowledgeResult.new(AcknowledgeResult::SUCCESS) if block_given?
  end
end

#attributesObject

Optional attributes for the received message.



125
126
127
# File 'lib/google/cloud/pubsub/received_message.rb', line 125

def attributes
  message.attributes
end

#dataObject

The received message payload. This data is a list of bytes encoded as ASCII-8BIT.



119
120
121
# File 'lib/google/cloud/pubsub/received_message.rb', line 119

def data
  message.data
end

#delivery_attemptInteger?

Returns the delivery attempt counter for the message. If a dead letter policy is not set on the subscription, this will be nil.

The delivery attempt counter is 1 + (the sum of number of NACKs and number of ack_deadline exceeds) for the message.

A NACK is any call to ModifyAckDeadline with a 0 deadline. An ack_deadline exceeds event is whenever a message is not acknowledged within ack_deadline. Note that ack_deadline is initially Subscription.ackDeadlineSeconds, but may get extended automatically by the client library.

The first delivery of a given message will have this value as 1. The value is calculated at best effort and is approximate.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

subscription_admin = pubsub.subscription_admin

subscription = subscription_admin.create_subscription \
  name: pubsub.subscription_path("my-topic-sub"),
  topic: pubsub.topic_path("my-topic"),
  dead_letter_policy: {
    dead_letter_topic: pubsub.topic_path("my-dead-letter-topic"),
    max_delivery_attempts: 10
  }

subscriber = pubsub.subscriber "my-topic-sub"

listener = subscriber.listen do |received_message|
  puts received_message.message.delivery_attempt
end

Returns:

  • (Integer, nil)

    A delivery attempt value of 1 or greater, or nil if a dead letter policy is not set on the subscription.



104
105
106
107
# File 'lib/google/cloud/pubsub/received_message.rb', line 104

def delivery_attempt
  return nil if @grpc.delivery_attempt && @grpc.delivery_attempt < 1
  @grpc.delivery_attempt
end

#messageObject Also known as: msg

The received message.



111
112
113
# File 'lib/google/cloud/pubsub/received_message.rb', line 111

def message
  Message.from_grpc @grpc.message
end

#message_idObject Also known as: msg_id

The ID of the received message, assigned by the server at publication time. Guaranteed to be unique within the topic.



132
133
134
# File 'lib/google/cloud/pubsub/received_message.rb', line 132

def message_id
  message.message_id
end

#modify_ack_deadline!(new_deadline) {|callback| ... } ⇒ Object

Modifies the acknowledge deadline for the message.

This indicates that more time is needed to process the message, or to make the message available for redelivery.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

subscriber = pubsub.subscriber "my-topic-sub"
listener = subscriber.listen do |received_message|
  puts received_message.message.data

  # Delay for 2 minutes
  received_message.modify_ack_deadline! 120 do |result|
    puts result.status
  end
end

# Start background threads that will call block passed to listen.
listener.start

# Shut down the subscriber when ready to stop receiving messages.
listener.stop!
require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

subscriber = pubsub.subscriber "my-topic-sub"
listener = subscriber.listen do |received_message|
  puts received_message.message.data

  # Delay for 2 minutes
  received_message.modify_ack_deadline! 120
end

# Start background threads that will call block passed to listen.
listener.start

# Shut down the subscriber when ready to stop receiving messages.
listener.stop!

Parameters:

  • new_deadline (Integer)

    The new ack deadline in seconds from the time this request is sent to the Pub/Sub system. Must be >= 0. For example, if the value is 10, the new ack deadline will expire 10 seconds after the call is made. Specifying 0 may immediately make the message available for another pull request.

Yields:

  • (callback)

    The block to be called when reject operation is done.

Yield Parameters:



276
277
278
279
280
281
282
283
284
# File 'lib/google/cloud/pubsub/received_message.rb', line 276

def modify_ack_deadline!(new_deadline, &)
  ensure_subscription!
  if subscription.respond_to?(:exactly_once_delivery_enabled) && subscription.exactly_once_delivery_enabled
    subscription.modify_ack_deadline new_deadline, ack_id, &
  else
    subscription.modify_ack_deadline new_deadline, ack_id
    yield AcknowledgeResult.new(AcknowledgeResult::SUCCESS) if block_given?
  end
end

#ordering_keyString

Identifies related messages for which publish order should be respected.

Google Cloud Pub/Sub ordering keys provide the ability to ensure related messages are sent to subscribers in the order in which they were published. Messages can be tagged with an ordering key, a string that identifies related messages for which publish order should be respected. The service guarantees that, for a given ordering key and publisher, messages are sent to subscribers in the order in which they were published. Ordering does not require sacrificing high throughput or scalability, as the service automatically distributes messages for different ordering keys across subscribers.

See Publisher#publish_async and Subscriber#listen.

Returns:

  • (String)


155
156
157
# File 'lib/google/cloud/pubsub/received_message.rb', line 155

def ordering_key
  message.ordering_key
end

#published_atObject Also known as: publish_time

The time at which the message was published.



161
162
163
# File 'lib/google/cloud/pubsub/received_message.rb', line 161

def published_at
  message.published_at
end

#reject! {|callback| ... } ⇒ Object Also known as: nack!, ignore!

Resets the acknowledge deadline for the message without acknowledging it.

This will make the message available for redelivery.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

subscriber = pubsub.subscriber "my-topic-sub"
listener = subscriber.listen do |received_message|
  puts received_message.message.data

  # Release message back to the API.
  received_message.reject! do |result|
    puts result.status
  end
end

# Start background threads that will call block passed to listen.
listener.start

# Shut down the subscriber when ready to stop receiving messages.
listener.stop!
require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

subscriber = pubsub.subscriber "my-topic-sub"
listener = subscriber.listen do |received_message|
  puts received_message.message.data

  # Release message back to the API.
  received_message.reject!
end

# Start background threads that will call block passed to listen.
listener.start

# Shut down the subscriber when ready to stop receiving messages.
listener.stop!

Yields:

  • (callback)

    The block to be called when reject operation is done.

Yield Parameters:



335
336
337
# File 'lib/google/cloud/pubsub/received_message.rb', line 335

def reject!(&)
  modify_ack_deadline! 0, &
end