Class: Google::Cloud::PubSub::Subscriber
- Inherits:
-
Object
- Object
- Google::Cloud::PubSub::Subscriber
- Defined in:
- lib/google/cloud/pubsub/subscriber.rb
Overview
A Subscriber is the primary interface for data plane operations, enabling you to receive messages from a subscription, either by streaming with a MessageListener or by pulling them directly.
Instance Method Summary collapse
-
#acknowledge(*messages) ⇒ Object
(also: #ack)
Acknowledges receipt of a message.
-
#deadline ⇒ Integer
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
-
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
-
#listen(deadline: nil, message_ordering: nil, streams: nil, inventory: nil, threads: {}) {|received_message| ... } ⇒ MessageListener
Create a MessageListener object that receives and processes messages using the code provided in the callback.
-
#message_ordering? ⇒ Boolean
Whether message ordering has been enabled.
-
#modify_ack_deadline(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
-
#name ⇒ String
The name of the subscription.
-
#pull(immediate: true, max: 100) ⇒ Array<Google::Cloud::PubSub::ReceivedMessage>
Pulls messages from the server, blocking until messages are available when called with the
immediate: falseoption, which is recommended to avoid adverse impacts on the performance of pull operations. -
#reference? ⇒ Boolean
Determines whether the subscription object was created without retrieving the resource representation from the Pub/Sub service.
-
#reload! ⇒ Google::Cloud::PubSub::Subscription
Reloads the subscription with current data from the Pub/Sub service.
-
#resource? ⇒ Boolean
Determines whether the subscription object was created with a resource representation from the Pub/Sub service.
-
#subscription_resource ⇒ Google::Cloud::PubSub::V1::Subscription
The underlying Subscription resource.
-
#wait_for_messages(max: 100) ⇒ Array<Google::Cloud::PubSub::ReceivedMessage>
Pulls from the server while waiting for messages to become available.
Instance Method Details
#acknowledge(*messages) ⇒ Object Also known as: ack
Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.
See also ReceivedMessage#acknowledge!.
442 443 444 445 446 447 448 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 442 def acknowledge * ack_ids = coerce_ack_ids return true if ack_ids.empty? ensure_service! service.acknowledge name, *ack_ids true end |
#deadline ⇒ Integer
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
Makes an API call to retrieve the deadline value when called on a reference object. See #reference?.
110 111 112 113 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 110 def deadline ensure_grpc! @grpc.ack_deadline_seconds end |
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
Makes an API call to determine whether the subscription resource exists when called on a reference object. See #reference?.
152 153 154 155 156 157 158 159 160 161 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 152 def exists? # Always true if the object is not set as reference return true unless reference? # If we have a value, return it return @exists unless @exists.nil? ensure_grpc! @exists = true rescue Google::Cloud::NotFoundError @exists = false end |
#listen(deadline: nil, message_ordering: nil, streams: nil, inventory: nil, threads: {}) {|received_message| ... } ⇒ MessageListener
At the time of this release, ordering keys are not yet publicly enabled and requires special project enablements.
Create a MessageListener object that receives and processes messages using the code provided in the callback. Messages passed to the callback should acknowledge (ReceivedMessage#acknowledge!) or reject (ReceivedMessage#reject!) the message. If no action is taken, the message will be removed from the subscriber and made available for redelivery after the callback is completed.
Google Cloud Pub/Sub ordering keys provide the ability to ensure related messages are sent to subscribers in the order in which they were published. Messages can be tagged with an ordering key, a string that identifies related messages for which publish order should be respected. The service guarantees that, for a given ordering key and publisher, messages are sent to subscribers in the order in which they were published. Ordering does not require sacrificing high throughput or scalability, as the service automatically distributes messages for different ordering keys across subscribers.
To use ordering keys, the subscription must be created with message
ordering enabled before calling #listen. When enabled, the subscriber
will deliver messages with the same ordering_key in the order they were
published.
411 412 413 414 415 416 417 418 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 411 def listen deadline: nil, message_ordering: nil, streams: nil, inventory: nil, threads: {}, &block ensure_service! deadline ||= self.deadline = if .nil? MessageListener.new name, block, deadline: deadline, streams: streams, inventory: inventory, message_ordering: , threads: threads, service: service end |
#message_ordering? ⇒ Boolean
At the time of this release, ordering keys are not yet publicly enabled and requires special project enablements.
Whether message ordering has been enabled. When enabled, messages
published with the same ordering_key will be delivered in the order
they were published. When disabled, messages may be delivered in any
order.
See Publisher#publish_async, #listen, and Message#ordering_key.
Makes an API call to retrieve the enable_message_ordering value when called on a reference object. See #reference?.
131 132 133 134 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 131 def ensure_grpc! @grpc. end |
#modify_ack_deadline(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
This indicates that more time is needed to process the messages, or to make the messages available for redelivery if the processing was interrupted.
See also ReceivedMessage#modify_ack_deadline!.
477 478 479 480 481 482 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 477 def modify_ack_deadline new_deadline, * ack_ids = coerce_ack_ids ensure_service! service.modify_ack_deadline name, ack_ids, new_deadline true end |
#name ⇒ String
The name of the subscription.
96 97 98 99 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 96 def name return @resource_name if reference? @grpc.name end |
#pull(immediate: true, max: 100) ⇒ Array<Google::Cloud::PubSub::ReceivedMessage>
Pulls messages from the server, blocking until messages are available
when called with the immediate: false option, which is recommended
to avoid adverse impacts on the performance of pull operations.
Raises an API error with status UNAVAILABLE if there are too many
concurrent pull requests pending for the given subscription.
See also #listen for the preferred way to process messages as they become available.
217 218 219 220 221 222 223 224 225 226 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 217 def pull immediate: true, max: 100 ensure_service! = { immediate: immediate, max: max } list_grpc = service.pull name, Array(list_grpc.).map do |msg_grpc| ReceivedMessage.from_grpc msg_grpc, self end rescue Google::Cloud::DeadlineExceededError [] end |
#reference? ⇒ Boolean
Determines whether the subscription object was created without retrieving the resource representation from the Pub/Sub service.
500 501 502 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 500 def reference? @grpc.nil? end |
#reload! ⇒ Google::Cloud::PubSub::Subscription
Reloads the subscription with current data from the Pub/Sub service.
539 540 541 542 543 544 545 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 539 def reload! ensure_service! subscription_path = service.subscription_path name @grpc = service.subscription_admin.get_subscription subscription: subscription_path @resource_name = nil self end |
#resource? ⇒ Boolean
Determines whether the subscription object was created with a resource representation from the Pub/Sub service.
520 521 522 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 520 def resource? !@grpc.nil? end |
#subscription_resource ⇒ Google::Cloud::PubSub::V1::Subscription
The underlying Subscription resource.
Provides access to the Google::Cloud::PubSub::V1::Subscription
resource managed by this subscriber.
Makes an API call to retrieve the actual subscription when called on a reference object. See #reference?.
85 86 87 88 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 85 def subscription_resource ensure_grpc! @grpc end |
#wait_for_messages(max: 100) ⇒ Array<Google::Cloud::PubSub::ReceivedMessage>
Pulls from the server while waiting for messages to become available. This is the same as:
subscriber.pull immediate: false
See also #listen for the preferred way to process messages as they become available.
254 255 256 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 254 def max: 100 pull immediate: false, max: max end |