Class: Google::Cloud::PubSub::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/google/cloud/pubsub/publisher.rb

Overview

Publisher

A Publisher is the primary interface for data plane operations on a topic, including publishing messages, batching messages for higher throughput, and managing ordering keys.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

publisher = pubsub.publisher "my-topic-only"

publisher.publish "task completed"

Instance Method Summary collapse

Instance Method Details

#async_publisherAsyncPublisher

AsyncPublisher object used to publish multiple messages in batches.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

publisher = pubsub.publisher "my-topic"
publisher.publish_async "task completed" do |result|
  if result.succeeded?
    log_publish_success result.data
  else
    log_publish_failure result.data, result.error
  end
end

publisher.async_publisher.stop!

Returns:



79
80
81
# File 'lib/google/cloud/pubsub/publisher.rb', line 79

def async_publisher
  @async_publisher
end

#enable_message_ordering!Object

Note:

At the time of this release, ordering keys are not yet publicly enabled and requires special project enablements.

Enables message ordering for messages with ordering keys on the #async_publisher. When enabled, messages published with the same ordering_key will be delivered in the order they were published.

See #message_ordering?. See #publish_async, Subscriber#listen, and Message#ordering_key.



357
358
359
360
# File 'lib/google/cloud/pubsub/publisher.rb', line 357

def enable_message_ordering!
  @async_publisher ||= AsyncPublisher.new name, service, **@async_opts
  @async_publisher.enable_message_ordering!
end

#message_ordering?Boolean

Whether message ordering for messages with ordering keys has been enabled on the #async_publisher. When enabled, messages published with the same ordering_key will be delivered in the order they were published. When disabled, messages may be delivered in any order.

See #enable_message_ordering!. See #publish_async, Subscriber#listen, and Message#ordering_key.

Returns:

  • (Boolean)


373
374
375
376
# File 'lib/google/cloud/pubsub/publisher.rb', line 373

def message_ordering?
  @async_publisher ||= AsyncPublisher.new name, service, **@async_opts
  @async_publisher.message_ordering?
end

#nameString

The name of the publisher.

Returns:

  • (String)

    A fully-qualified topic name in the form projects/{project_id}/topics/{topic_id}.



89
90
91
92
# File 'lib/google/cloud/pubsub/publisher.rb', line 89

def name
  return @resource_name if reference?
  @grpc.name
end

#publish(data = nil, attributes = nil, ordering_key: nil, compress: nil, compression_bytes_threshold: nil, **extra_attrs) {|batch| ... } ⇒ Message+

Publishes one or more messages to the publisher.

The message payload must not be empty; it must contain either a non-empty data field, or at least one attribute.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

publisher = pubsub.publisher "my-topic"
msg = publisher.publish "task completed"

A message can be published using a File object:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

publisher = pubsub.publisher "my-topic"
file = File.open "message.txt", mode: "rb"
msg = publisher.publish file

Additionally, a message can be published with attributes:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

publisher = pubsub.publisher "my-topic"
msg = publisher.publish "task completed",
                    foo: :bar,
                    this: :that

Multiple messages can be sent at the same time using a block:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

publisher = pubsub.publisher "my-topic"

msgs = publisher.publish do |p|
  p.publish "task 1 completed", foo: :bar
  p.publish "task 2 completed", foo: :baz
  p.publish "task 3 completed", foo: :bif
end

Ordered messages are supported using ordering_key:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

publisher = pubsub.publisher "my-ordered-topic"

# Ensure that message ordering is enabled.
publisher.enable_message_ordering!

# Publish an ordered message with an ordering key.
publisher.publish "task completed",
              ordering_key: "task-key"

Parameters:

  • data (String, File) (defaults to: nil)

    The message payload. This will be converted to bytes encoded as ASCII-8BIT.

  • attributes (Hash) (defaults to: nil)

    Optional attributes for the message.

  • ordering_key (String) (defaults to: nil)

    Identifies related messages for which publish order should be respected.

Yields:

  • (batch)

    a block for publishing multiple messages in one request

Yield Parameters:

Returns:

  • (Message, Array<Message>)

    Returns the published message when called without a block, or an array of messages when called with a block.



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/google/cloud/pubsub/publisher.rb', line 214

def publish data = nil, attributes = nil, ordering_key: nil, compress: nil, compression_bytes_threshold: nil,
            **extra_attrs, &block
  ensure_service!
  batch = BatchPublisher.new data,
                             attributes,
                             ordering_key,
                             extra_attrs,
                             compress: compress,
                             compression_bytes_threshold: compression_bytes_threshold

  block&.call batch
  return nil if batch.messages.count.zero?
  reason = block_given? ? "synchronous publish multiple" : "synchronous publish single"
  batch.publish_batch_messages name, service, reason: reason
end

#publish_async(data = nil, attributes = nil, ordering_key: nil, **extra_attrs) {|result| ... } ⇒ Object

Note:

At the time of this release, ordering keys are not yet publicly enabled and requires special project enablements.

Publishes a message asynchronously to the topic using #async_publisher.

The message payload must not be empty; it must contain either a non-empty data field, or at least one attribute.

Google Cloud Pub/Sub ordering keys provide the ability to ensure related messages are sent to subscribers in the order in which they were published. Messages can be tagged with an ordering key, a string that identifies related messages for which publish order should be respected. The service guarantees that, for a given ordering key and publisher, messages are sent to subscribers in the order in which they were published. Ordering does not require sacrificing high throughput or scalability, as the service automatically distributes messages for different ordering keys across subscribers.

To use ordering keys, specify ordering_key. Before specifying ordering_key on a message a call to #enable_message_ordering! must be made or an error will be raised.

Publisher flow control limits the number of outstanding messages that are allowed to wait to be published.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

publisher = pubsub.publisher "my-topic"
publisher.publish_async "task completed" do |result|
  if result.succeeded?
    log_publish_success result.data
  else
    log_publish_failure result.data, result.error
  end
end

# Shut down the publisher when ready to stop publishing messages.
publisher.async_publisher.stop!

A message can be published using a File object:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

publisher = pubsub.publisher "my-topic"
file = File.open "message.txt", mode: "rb"
publisher.publish_async file

# Shut down the publisher when ready to stop publishing messages.
publisher.async_publisher.stop!

Additionally, a message can be published with attributes:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

publisher = pubsub.publisher "my-topic"
publisher.publish_async "task completed",
                    foo: :bar, this: :that

# Shut down the publisher when ready to stop publishing messages.
publisher.async_publisher.stop!

Ordered messages are supported using ordering_key:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

publisher = pubsub.publisher "my-ordered-topic"

# Ensure that message ordering is enabled.
publisher.enable_message_ordering!

# Publish an ordered message with an ordering key.
publisher.publish_async "task completed",
                    ordering_key: "task-key"

# Shut down the publisher when ready to stop publishing messages.
publisher.async_publisher.stop!

Parameters:

  • data (String, File) (defaults to: nil)

    The message payload. This will be converted to bytes encoded as ASCII-8BIT.

  • attributes (Hash) (defaults to: nil)

    Optional attributes for the message.

  • ordering_key (String) (defaults to: nil)

    Identifies related messages for which publish order should be respected.

Yields:

  • (result)

    the callback for when the message has been published

Yield Parameters:

  • result (PublishResult)

    the result of the asynchronous publish

Raises:



340
341
342
343
344
# File 'lib/google/cloud/pubsub/publisher.rb', line 340

def publish_async data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback
  ensure_service!
  @async_publisher ||= AsyncPublisher.new name, service, **@async_opts
  @async_publisher.publish data, attributes, ordering_key: ordering_key, **extra_attrs, &callback
end

#reference?Boolean

Determines whether the publisher object was created with a resource representation from the Pub/Sub service.

Returns:

  • (Boolean)

    true when the publisher was created with a resource representation, false otherwise.



101
102
103
# File 'lib/google/cloud/pubsub/publisher.rb', line 101

def reference?
  @grpc.nil?
end

#reload!Google::Cloud::PubSub::Publisher

Reloads the publisher with current data from the Pub/Sub service.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

publisher = pubsub.publisher "my-topic", skip_lookup: true

publisher.reload!

Returns:



132
133
134
135
136
137
138
# File 'lib/google/cloud/pubsub/publisher.rb', line 132

def reload!
  ensure_service!
  topic_path = service.topic_path name
  @grpc = service.topic_admin.get_topic topic: topic_path
  @resource_name = nil
  self
end

#resource?Boolean

Determines whether the publisher object was created with a resource representation from the Pub/Sub service.

Returns:

  • (Boolean)

    true when the publisher was created with a resource representation, false otherwise.



112
113
114
# File 'lib/google/cloud/pubsub/publisher.rb', line 112

def resource?
  !@grpc.nil?
end

#resume_publish(ordering_key) ⇒ boolean

Resume publishing ordered messages for the provided ordering key.

Parameters:

  • ordering_key (String)

    Identifies related messages for which publish order should be respected.

Returns:

  • (boolean)

    true when resumed, false otherwise.



386
387
388
389
# File 'lib/google/cloud/pubsub/publisher.rb', line 386

def resume_publish ordering_key
  @async_publisher ||= AsyncPublisher.new name, service, **@async_opts
  @async_publisher.resume_publish ordering_key
end