Google Cloud Pub/Sub
Google Cloud Pub/Sub is designed to provide reliable, many-to-many, asynchronous messaging between applications. Publisher applications can send messages to a "topic" and other applications can subscribe to that topic to receive the messages. By decoupling senders and receivers, Google Cloud Pub/Sub allows developers to communicate between independently written applications.
The goal of google-cloud is to provide an API that is comfortable to Rubyists. Your authentication credentials are detected automatically in Google Cloud Platform (GCP), including Google Compute Engine (GCE), Google Kubernetes Engine (GKE), Google App Engine (GAE), Google Cloud Functions (GCF) and Cloud Run. In other environments you can configure authentication easily, either directly in your code or via environment variables. Read more about the options for connecting in the Authentication Guide.
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher "my-topic"
publisher.publish "topic-message"
subscriber = pubsub.subscriber "my-topic-sub"
subscriber.listen do ||
puts "Message: #{..data}"
.acknowledge!
end
This guide provides an overview of the client library's operations, which are categorized into Admin Operations and Data Plane Operations.
- Admin Operations: Used for creating, configuring, and managing Pub/Sub resources (topics, subscriptions, schemas).
- Data Plane Operations: For the core functionality of publishing and receiving messages.
To learn more about Pub/Sub, read the Google Cloud Pub/Sub Overview .
Admin Operations
Topic Admin Client
Manages topic resources.
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new project_id: "my-project-id"
topic_admin = pubsub.topic_admin
Creating a Topic
A Topic is a named resource to which messages are sent by publishers. The resource must be created using a topic admin client before it can be used.
topic_path = pubsub.topic_path "my-topic"
topic = topic_admin.create_topic name: topic_path
puts "Topic #{topic.name} created."
Retrieving a Topic
A Topic is found by its full name.
topic_name = "my-topic"
topic_path = pubsub.topic_path topic_name # Format is `projects/#{project_id}/topics/#{topic_name}`
topic = topic_admin.get_topic topic: topic_path
puts "Topic: #{topic.name}."
Subscription Admin Client
Manages subscription and snapshot resources.
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new project_id: "my-project-id"
subscription_admin = pubsub.subscription_admin
Creating a Subscription
A Subscription is a named resource representing the stream of messages from a single, specific Topic, to be delivered to the subscribing application.
topic_path = pubsub.topic_path "my-topic" # Already created Topic resource
subscription_path = pubsub.subscription_path "my-topic-subscription"
subscription = subscription_admin.create_subscription name: subscription_path, topic: topic_path
The subscription can be created that specifies the number of seconds to wait to be acknowledged as well as an endpoint URL to push the messages to:
topic_path = pubsub.topic_path "my-topic" # Already created Topic resource
subscription_path = pubsub.subscription_path "my-topic-subscription"
push_config = Google::Cloud::PubSub::V1::PushConfig.new push_endpoint: "https://example.com/push"
subscription = subscription_admin.create_subscription name: subscription_path, topic: topic_path,
push_config: push_config,
ack_deadline_seconds: 120
Retrieving Subscriptions
A Subscription is found by its name.
subscription_path = pubsub.subscription_path "my-topic-subscription"
subscription = subscription_admin.get_subscription subscription: subscription_path
Data Plane Operations
Publisher Client
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new project_id: "my-project-id"
publisher = pubsub.publisher "my-topic"
Publishing Messages
Messages are published to a topic. Any message published to a topic without a subscription will be lost. Ensure the topic has a subscription before publishing.
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher "my-topic"
msg = publisher.publish "task completed"
Messages can also be published with attributes:
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher "my-topic"
msg = publisher.publish "task completed",
foo: :bar,
this: :that
Messages can also be published in batches asynchronously using publish_async.
(See Publisher#publish_async and
AsyncPublisher)
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher "my-topic"
publisher.publish_async "task completed" do |result|
if result.succeeded?
log_publish_success result.data
else
log_publish_failure result.data, result.error
end
end
publisher.async_publisher.stop!
Or multiple messages can be published in batches at the same time by passing a
block to publish. (See BatchPublisher)
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher "my-topic"
msgs = publisher.publish do |batch|
batch.publish "task 1 completed", foo: :bar
batch.publish "task 2 completed", foo: :baz
batch.publish "task 3 completed", foo: :bif
end
Subscriber Client
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new project_id: "my-project-id"
subscriber = pubsub.subscriber "my-topic-subscription"
Receiving Messages
Messages can be streamed from a subscription with a subscriber object that is
created using listen. (See Subscriber#listen and MessageListener)
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber "my-topic-sub"
# Create a MessageListener to listen for available messages.
# By default, this block will be called on 8 concurrent threads
# but this can be tuned with the `threads` option.
# The `streams` and `inventory` parameters allow further tuning.
listener = subscriber.listen threads: { callback: 16 } do ||
# process message
puts "Data: #{..data}, published at #{..published_at}"
.acknowledge!
end
# Handle exceptions from listener
listener.on_error do |exception|
puts "Exception: #{exception.class} #{exception.}"
end
# Gracefully shut down the subscriber on program exit, blocking until
# all received messages have been processed or 10 seconds have passed
at_exit do
listener.stop!(10)
end
# Start background threads that will call the block passed to listen.
listener.start
# Block, letting processing threads continue in the background
sleep
Messages also can be pulled directly in a one-time operation. (See Subscriber#pull)
The immediate: false option is recommended to avoid adverse impacts on the
performance of pull operations.
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber "my-topic-sub"
= subscriber.pull immediate: false
A maximum number of messages to pull can be specified:
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber "my-topic-sub"
= subscriber.pull immediate: false, max: 10
Acknowledging a Message
Messages that are received can be acknowledged in Pub/Sub, signaling the server not to deliver them again.
A Message that can be acknowledged is called a ReceivedMessage. ReceivedMessages can be acknowledged one at a time: (See ReceivedMessage#acknowledge!)
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber "my-topic-sub"
listener = subscriber.listen do ||
# process message
.acknowledge!
end
# Start background threads that will call the block passed to listen.
listener.start
# Shut down the subscriber when ready to stop receiving messages.
listener.stop!
Or, multiple messages can be acknowledged in a single API call: (See Subscriber#acknowledge)
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber "my-topic-sub"
= subscriber.pull immediate: false
subscriber.acknowledge
Modifying a Deadline
A message must be acknowledged after it is pulled, or Pub/Sub will mark the message for redelivery. The message acknowledgement deadline can delayed if more time is needed. This will allow more time to process the message before the message is marked for redelivery. (See ReceivedMessage#modify_ack_deadline!)
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber "my-topic-sub"
listener = subscriber.listen do ||
puts ..data
# Delay for 2 minutes
.modify_ack_deadline! 120
end
# Start background threads that will call the block passed to listen.
listener.start
# Shut down the subscriber when ready to stop receiving messages.
listener.stop!
The message can also be made available for immediate redelivery:
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber "my-topic-sub"
listener = subscriber.listen do ||
puts ..data
# Mark for redelivery
.reject!
end
# Start background threads that will call the block passed to listen.
listener.start
# Shut down the subscriber when ready to stop receiving messages.
listener.stop!
Multiple messages can be delayed or made available for immediate redelivery: (See Subscriber#modify_ack_deadline)
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber "my-topic-sub"
= subscriber.pull immediate: false
subscriber.modify_ack_deadline 120,
Using Ordering Keys
Google Cloud Pub/Sub ordering keys provide the ability to ensure related messages are sent to subscribers in the order in which they were published. Messages can be tagged with an ordering key, a string that identifies related messages for which publish order should be respected. The service guarantees that, for a given ordering key and publisher, messages are sent to subscribers in the order in which they were published. Ordering does not require sacrificing high throughput or scalability, as the service automatically distributes messages for different ordering keys across subscribers.
Note: At the time of this release, ordering keys are not yet publicly enabled and requires special project enablements.
Publishing Ordered Messages
To use ordering keys when publishing messages, a call to
Publisher#enable_message_ordering! must be made and the ordering_key argument
must be provided when calling Publisher#publish_async.
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher "my-ordered-topic"
# Ensure that message ordering is enabled.
publisher.
# Publish an ordered message with an ordering key.
publisher.publish_async "task completed",
ordering_key: "task-key"
# Shut down the publisher when ready to stop publishing messages.
publisher.async_publisher.stop!
Handling errors with Ordered Keys
Ordered messages that fail to publish to the Pub/Sub API due to error will put
the ordering_key in a failed state, and future calls to
Publisher#publish_async with the
ordering_key will raise OrderingKeyError. To allow future messages with the ordering_key to be
published, the ordering_key must be passed to
Publisher#resume_publish.
Receiving Ordered Messages
To use ordering keys when subscribing to messages, the subscription must be
created with message ordering enabled before calling
Subscriber#listen. When enabled,
the subscriber will deliver messages with the same ordering_key in the order
they were published.
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new
subscription = ... # "my-ordered-topic-sub" subscription with message ordering enabled
puts subscription.enable_message_ordering #=> true
subscriber = pubsub.subscriber "my-ordered-topic-sub"
listener = subscriber.listen do |received_message|
# Messsages with the same ordering_key are received
# in the order in which they were published.
received_message.acknowledge!
end
# Start background threads that will call block passed to listen.
listener.start
# Shut down the subscriber when ready to stop receiving messages.
listener.stop!
Minimizing API calls before receiving and acknowledging messages
A subscriber object can be created without making any API calls by providing
the skip_lookup argument to Project#subscriber. A MessageListener object can also be created without an API
call by providing the deadline optional argument to
Subscriber#listen
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new
# No API call is made to retrieve the subscription resource.
subscriber = pubsub.subscriber "my-topic-sub", skip_lookup: true
# No API call is made to retrieve the subscription deadline.
listener = subscriber.listen deadline: 60 do ||
# process message
.acknowledge!
end
# Start background threads that will call block passed to listen.
listener.start
# Shut down the subscriber when ready to stop receiving messages.
listener.stop!
Skipping API calls may be used to avoid Google::Cloud::PermissionDeniedError
if your account has limited access to the Pub/Sub API. In particular, the role
roles/pubsub.subscriber does not have the permission
pubsub.subscriptions.get, which is required to retrieve a subscription
resource. See Access Control -
Roles for the
complete list of Pub/Sub roles and permissions.
Creating a snapshot and using seek
You can create a snapshot to retain the existing backlog on a subscription. The
snapshot will hold the messages in the subscription's backlog that are
unacknowledged upon the successful completion of the create_snapshot
operation.
Later, you can use seek to reset the subscription's backlog to the snapshot.
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new
subscription_admin = pubsub.subscription_admin
snapshot_path = pubsub.snapshot_path "my-snapshot"
subscription = ... # Already created Google::Cloud::PubSub::V1::Subscription
snapshot = subscription_admin.create_snapshot name: snapshot_path, subscription: subscription.name
subscriber = pubsub.subscriber "my-topic-sub"
received_messages = sub.pull immediate: false
subscriber.acknowledge received_messages
subcription_admin.seek subscription: subscription.name, snapshot: snapshot.name
Working Across Projects
All calls to the Pub/Sub service use the same project and credentials provided
to the PubSub.new method. However, it is common to
reference publishers or subscribers in other projects, which can be achieved by
using the project option. The main credentials must have permissions to the
topics and subscriptions in other projects.
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new # my-project
# Get a Publisher for a topic in the current project
publisher = pubsub.publisher "my-topic"
publisher.name #=> "projects/my-project/topics/my-topic"
# Get a Publisher for a topic in another project
other_publisher = pubsub.publisher "other-topic", project: "other-project-id"
other_publisher.name #=> "projects/other-project-id/topics/other-topic"
It is possible to create a subscription in the current project that pulls from a topic in another project:
require "google/cloud/pubsub"
pubsub = Google::Cloud::PubSub.new # my-project
subscription_admin = pubsub.subscription_admin
# Get a Publisher for a topic in another project
publisher = pubsub.publisher "other-topic", project: "other-project-id"
# Create a subscription in the current project that pulls from
# the topic in another project
subscription_path = pubsub.subscription_path "my-sub"
subscription = subscription_admin.create_subscription name: subscription_path, topic: publisher.name
subscription.name #=> "projects/my-project/subscriptions/my-sub"
publisher.name #=> "projects/other-project-id/topics/other-topic"
Additional information
Google Cloud Pub/Sub can be configured to use an emulator or to enable gRPC's logging. To learn more, see the Emulator guide and Logging guide.