Reference documentation and code samples for the Cloud Pub/Sub API class Google::Cloud::PubSub::Subscription.
Subscription
A named resource representing the stream of messages from a single, specific Topic, to be delivered to the subscribing application.
Inherits
- Object
Example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" subscriber = sub.listen do |received_message| # process message received_message.acknowledge! end # Handle exceptions from listener subscriber.on_error do |exception| puts "Exception: #{exception.class} #{exception.message}" end # Gracefully shut down the subscriber at_exit do subscriber.stop! end # Start background threads that will call the block passed to listen. subscriber.start sleep
Methods
#ack
def ack(*messages)
Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.
See also ReceivedMessage#acknowledge!.
- messages (ReceivedMessage, String) — One or more ReceivedMessage objects or ack_id values.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" received_messages = sub.pull immediate: false sub.acknowledge received_messages
#acknowledge
def acknowledge(*messages)
Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.
See also ReceivedMessage#acknowledge!.
- messages (ReceivedMessage, String) — One or more ReceivedMessage objects or ack_id values.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" received_messages = sub.pull immediate: false sub.acknowledge received_messages
#bigquery_config
def bigquery_config() { |bigquery_config| ... } -> Google::Cloud::PubSub::V1::BigQueryConfig
Inspect the Subscription's bigquery configuration settings. The configuration can be changed by modifying the values in the method's block.
- (bigquery_config) — a block for modifying the bigquery configuration
- bigquery_config (Google::Cloud::PubSub::V1::BigQueryConfig)
- (Google::Cloud::PubSub::V1::BigQueryConfig)
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" sub.bigquery_config.table #=> "my-project:dataset-id.table-id" sub.bigquery_config.use_topic_schema #=> true sub.bigquery_config.write_metadata #=> false
Update the bigquery configuration by passing a block:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-subscription" sub.bigquery_config do |bc| bc.write_metadata = true bc.use_topic_schema = false end
#create_snapshot
def create_snapshot(snapshot_name = nil, labels: nil) -> Google::Cloud::PubSub::Snapshot
Creates a new Snapshot from the subscription. The created snapshot is guaranteed to retain:
- The existing backlog on the subscription. More precisely, this is
defined as the messages in the subscription's backlog that are
unacknowledged upon the successful completion of the
create_snapshot
operation; as well as: - Any messages published to the subscription's topic following the
successful completion of the
create_snapshot
operation.
-
snapshot_name (String, nil) — Name of the new snapshot. Optional.
If the name is not provided, the server will assign a random name
for this snapshot on the same project as the subscription.
The value can be a simple snapshot ID (relative name), in which
case the current project ID will be supplied, or a fully-qualified
snapshot name in the form
projects/{project_id}/snapshots/{snapshot_id}
.The snapshot ID (relative name) must start with a letter, and contain only letters (
[A-Za-z]
), numbers ([0-9]
), dashes (-
), underscores (_
), periods (.
), tildes (~
), plus (+
) or percent signs (%
). It must be between 3 and 255 characters in length, and it must not start withgoog
. - labels (Hash) (defaults to: nil) — A hash of user-provided labels associated with the snapshot. You can use these to organize and group your snapshots. Label keys and values can be no longer than 63 characters, can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. Label values are optional. Label keys must start with a letter and each label in the list must have a different key. See Creating and Managing Labels.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-sub" snapshot = sub.create_snapshot "my-snapshot" snapshot.name #=> "projects/my-project/snapshots/my-snapshot"
Without providing a name:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-sub" snapshot = sub.create_snapshot snapshot.name #=> "projects/my-project/snapshots/gcr-analysis-..."
#dead_letter_max_delivery_attempts
def dead_letter_max_delivery_attempts() -> Integer, nil
Returns the maximum number of delivery attempts for any message in the subscription's dead letter policy if a
dead letter policy is configured, otherwise nil
. Dead lettering is done on a best effort basis. The same
message might be dead lettered multiple times. The value must be between 5 and 100.
The number of delivery attempts is defined as 1 + (the sum of number of NACKs and number of times the acknowledgement deadline has been exceeded for the message). A NACK is any call to ModifyAckDeadline with a 0 deadline. Note that client libraries may automatically extend ack_deadlines.
This field will be honored on a best effort basis. If this parameter is nil
or 0
, a default value of 5
is used.
See also #dead_letter_max_delivery_attempts=, #dead_letter_topic=, #dead_letter_topic and #remove_dead_letter_policy.
Makes an API call to retrieve the dead_letter_policy when called on a reference object. See #reference?.
-
(Integer, nil) — A value between
5
and100
, ornil
if no dead letter policy is configured.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" sub.dead_letter_topic.name #=> "projects/my-project/topics/my-dead-letter-topic" sub.dead_letter_max_delivery_attempts #=> 10
#dead_letter_max_delivery_attempts=
def dead_letter_max_delivery_attempts=(new_dead_letter_max_delivery_attempts)
Sets the maximum number of delivery attempts for any message in the subscription's dead letter policy. Dead lettering is done on a best effort basis. The same message might be dead lettered multiple times. The value must be between 5 and 100.
The number of delivery attempts is defined as 1 + (the sum of number of NACKs and number of times the acknowledgement deadline has been exceeded for the message). A NACK is any call to ModifyAckDeadline with a 0 deadline. Note that client libraries may automatically extend ack_deadlines.
This field will be honored on a best effort basis. If this parameter is 0, a default value of 5 is used.
Makes an API call to retrieve the dead_letter_policy when called on a reference object. See #reference?.
The dead letter topic must be set first. See #dead_letter_topic=, #dead_letter_topic and #remove_dead_letter_policy.
-
new_dead_letter_max_delivery_attempts (Integer, nil) — A value between 5 and 100. If this parameter is
nil
or0
, a default value of 5 is used.
- (ArgumentError) — if the dead letter topic has not been set. See #dead_letter_topic=.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" sub.dead_letter_topic.name #=> "projects/my-project/topics/my-dead-letter-topic" sub.dead_letter_max_delivery_attempts = 20
#dead_letter_topic
def dead_letter_topic() -> Topic, nil
Returns the Topic to which dead letter messages should be published if a dead letter policy is configured,
otherwise nil
. Dead lettering is done on a best effort basis. The same message might be dead lettered
multiple times.
See also #dead_letter_topic=, #dead_letter_max_delivery_attempts=, #dead_letter_max_delivery_attempts and #remove_dead_letter_policy.
Makes an API call to retrieve the topic name when called on a reference object. See #reference?.
- (Topic, nil)
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" sub.dead_letter_topic.name #=> "projects/my-project/topics/my-dead-letter-topic" sub.dead_letter_max_delivery_attempts #=> 10
#dead_letter_topic=
def dead_letter_topic=(new_dead_letter_topic)
Sets the Topic to which dead letter messages for the subscription should be published. Dead lettering is
done on a best effort basis. The same message might be dead lettered multiple times.
The Cloud Pub/Sub service account associated with the enclosing subscription's parent project (i.e.,
service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com
) must have permission to Publish() to this
topic.
The operation will fail if the topic does not exist. Users should ensure that there is a subscription attached to this topic since messages published to a topic with no subscriptions are lost.
Makes an API call to retrieve the dead_letter_policy value when called on a reference object. See #reference?.
See also #dead_letter_topic, #dead_letter_max_delivery_attempts=, #dead_letter_max_delivery_attempts and #remove_dead_letter_policy.
- new_dead_letter_topic (Topic) — The topic to which dead letter messages for the subscription should be published.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" dead_letter_topic = pubsub.topic "my-dead-letter-topic", skip_lookup: true sub.dead_letter_topic = dead_letter_topic
#deadline
def deadline() -> Integer
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
Makes an API call to retrieve the deadline value when called on a reference object. See #reference?.
- (Integer)
#deadline=
def deadline=(new_deadline)
Sets the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
- new_deadline (Integer) — The new deadline value.
#delete
def delete() -> Boolean
Deletes an existing subscription. All pending messages in the subscription are immediately dropped.
-
(Boolean) — Returns
true
if the subscription was deleted.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" sub.delete
#detach
def detach() -> Boolean
Detaches a subscription from its topic. All messages retained in the subscription are dropped. Detached
subscriptions don't receive messages from their topic and don't retain any backlog. Subsequent #pull and
#listen (pull and streaming pull) operations will raise FAILED_PRECONDITION
. If the subscription is a push
subscription (see #push_config), pushes to the endpoint will stop. It may take a few minutes for the
subscription's detached state to be reflected in subsequent calls to #detached?.
-
(Boolean) — Returns
true
if the detach operation was successful.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" sub.detach # sleep 120 sub.detached? #=> true
#detached?
def detached?() -> Boolean
Whether the subscription is detached from its topic. Detached subscriptions don't receive messages from their
topic and don't retain any backlog. #pull and #listen (pull and streaming pull) operations will raise
FAILED_PRECONDITION
. If the subscription is a push subscription (see #push_config), pushes to the endpoint
will not be made. The default value is false
.
See Topic#subscribe and #detach.
Makes an API call to retrieve the detached value when called on a reference object. See #reference?.
- (Boolean)
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" sub.detach # sleep 120 sub.detached? #=> true
#endpoint
def endpoint() -> String
Returns the URL locating the endpoint to which messages should be
pushed. For example, a Webhook endpoint might use
https://example.com/push
.
Makes an API call to retrieve the endpoint value when called on a reference object. See #reference?.
- (String)
#endpoint=
def endpoint=(new_endpoint)
Sets the URL locating the endpoint to which messages should be pushed.
For example, a Webhook endpoint might use https://example.com/push
.
- new_endpoint (String) — The new endpoint value.
#exists?
def exists?() -> Boolean
Determines whether the subscription exists in the Pub/Sub service.
Makes an API call to determine whether the subscription resource exists when called on a reference object. See #reference?.
- (Boolean)
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" sub.exists? #=> true
#expires_in
def expires_in() -> Numeric, nil
The duration (in seconds) for when a subscription expires after the subscription goes inactive. A subscription is considered active as long as any connected subscriber is successfully consuming messages from the subscription or is issuing operations on the subscription.
If #expires_in= is not set, a default value of of 31 days will be used. The minimum allowed value is 1 day.
Makes an API call to retrieve the expires_in value when called on a reference object. See #reference?.
-
(Numeric, nil) — The expiration duration, or
nil
if unset.
#expires_in=
def expires_in=(ttl)
Sets the duration (in seconds) for when a subscription expires after the subscription goes inactive.
See also #expires_in.
-
ttl (Numeric, nil) — The expiration duration in seconds, or
nil
to unset.
#filter
def filter() -> String
An expression written in the Cloud Pub/Sub filter language. If non-empty, then only Message instances whose
attributes
field matches the filter are delivered on this subscription. If empty, then no messages are
filtered out.
Makes an API call to retrieve the filter value when called on a reference object. See #reference?.
- (String) — The frozen filter string.
#labels
def labels() -> Hash
A hash of user-provided labels associated with this subscription. Labels can be used to organize and group subscriptions.See Creating and Managing Labels.
The returned hash is frozen and changes are not allowed. Use #labels= to update the labels for this subscription.
Makes an API call to retrieve the labels value when called on a reference object. See #reference?.
- (Hash) — The frozen labels hash.
#labels=
def labels=(new_labels)
Sets the hash of user-provided labels associated with this subscription. Labels can be used to organize and group subscriptions. Label keys and values can be no longer than 63 characters, can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. Label values are optional. Label keys must start with a letter and each label in the list must have a different key. See Creating and Managing Labels.
- new_labels (Hash) — The new labels hash.
- (ArgumentError)
#listen
def listen(deadline: nil, message_ordering: nil, streams: nil, inventory: nil, threads: {}, &block) { |received_message| ... } -> Subscriber
Create a Subscriber object that receives and processes messages using the code provided in the callback. Messages passed to the callback should acknowledge (ReceivedMessage#acknowledge!) or reject (ReceivedMessage#reject!) the message. If no action is taken, the message will be removed from the subscriber and made available for redelivery after the callback is completed.
Google Cloud Pub/Sub ordering keys provide the ability to ensure related messages are sent to subscribers in the order in which they were published. Messages can be tagged with an ordering key, a string that identifies related messages for which publish order should be respected. The service guarantees that, for a given ordering key and publisher, messages are sent to subscribers in the order in which they were published. Ordering does not require sacrificing high throughput or scalability, as the service automatically distributes messages for different ordering keys across subscribers.
To use ordering keys, the subscription must be created with message
ordering enabled (See Topic#subscribe and #message_ordering?)
before calling #listen. When enabled, the subscriber will deliver
messages with the same ordering_key
in the order they were
published.
-
deadline (Numeric) (defaults to: nil) — The default number of seconds the stream
will hold received messages before modifying the message's ack
deadline. The minimum is 10, the maximum is 600. Default is
#deadline. Optional.
When using a reference object an API call will be made to retrieve the default deadline value for the subscription when this argument is not provided. See #reference?.
-
message_ordering (Boolean) (defaults to: nil) — Whether message ordering has been
enabled. The value provided must match the value set on the Pub/Sub
service. See #message_ordering?. Optional.
When using a reference object an API call will be made to retrieve the default message_ordering value for the subscription when this argument is not provided. See #reference?.
- streams (Integer) (defaults to: nil) — The number of concurrent streams to open to pull messages from the subscription. Default is 4. Optional.
-
inventory (Hash, Integer) (defaults to: nil) —
The settings to control how received messages are to be handled by the subscriber. When provided as an Integer instead of a Hash only
max_outstanding_messages
will be set. Optional.Hash keys and values may include the following:
:max_outstanding_messages
[Integer] The number of received messages to be collected by subscriber. Default is 1,000. (Note: replaces:limit
, which is deprecated.):max_outstanding_bytes
[Integer] The total byte size of received messages to be collected by subscriber. Default is 100,000,000 (100MB). (Note: replaces:bytesize
, which is deprecated.):use_legacy_flow_control
[Boolean] Disables enforcing flow control settings at the Cloud PubSub server and the less accurate method of only enforcing flow control at the client side is used instead. Default is false.:max_total_lease_duration
[Integer] The number of seconds that received messages can be held awaiting processing. Default is 3,600 (1 hour). (Note: replaces:extension
, which is deprecated.):max_duration_per_lease_extension
[Integer] The maximum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message redelivery if the subscriber fails to extend the deadline. Default is 0 (disabled).
-
threads (Hash) (defaults to: {}) —
The number of threads to create to handle concurrent calls by each stream opened by the subscriber. Optional.
Hash keys and values may include the following:
:callback
(Integer) The number of threads used to handle the received messages. Default is 8.:push
(Integer) The number of threads to handle acknowledgement (ReceivedMessage#ack!) and modify ack deadline messages (ReceivedMessage#nack!, ReceivedMessage#modify_ack_deadline!). Default is 4.
- (received_message) — a block for processing new messages
- received_message (ReceivedMessage) — the newly received message
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" subscriber = sub.listen do |received_message| # process message puts "Data: #{received_message.message.data}, published at #{received_message.message.published_at}" received_message.acknowledge! end # Start background threads that will call block passed to listen. subscriber.start # Shut down the subscriber when ready to stop receiving messages. subscriber.stop!
Configuring to increase concurrent callbacks:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" subscriber = sub.listen threads: { callback: 16 } do |rec_message| # store the message somewhere before acknowledging store_in_backend rec_message.data # takes a few seconds rec_message.acknowledge! end # Start background threads that will call block passed to listen. subscriber.start # Shut down the subscriber when ready to stop receiving messages. subscriber.stop!
Ordered messages are supported using ordering_key:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-ordered-topic-sub" sub.message_ordering? #=> true subscriber = sub.listen do |received_message| # messsages with the same ordering_key are received # in the order in which they were published. received_message.acknowledge! end # Start background threads that will call block passed to listen. subscriber.start # Shut down the subscriber when ready to stop receiving messages. subscriber.stop!
Set the maximum amount of time before redelivery if the subscriber fails to extend the deadline:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" subscriber = sub.listen inventory: { max_duration_per_lease_extension: 20 } do |received_message| # Process message very slowly with possibility of failure. process rec_message.data # takes minutes rec_message.acknowledge! end # Start background threads that will call block passed to listen. subscriber.start # Shut down the subscriber when ready to stop receiving messages. subscriber.stop!
#message_ordering?
def message_ordering?() -> Boolean
Whether message ordering has been enabled. When enabled, messages
published with the same ordering_key
will be delivered in the order
they were published. When disabled, messages may be delivered in any
order.
See Topic#publish_async, #listen, and Message#ordering_key.
Makes an API call to retrieve the enable_message_ordering value when called on a reference object. See #reference?.
- (Boolean)
#modify_ack_deadline
def modify_ack_deadline(new_deadline, *messages)
Modifies the acknowledge deadline for messages.
This indicates that more time is needed to process the messages, or to make the messages available for redelivery if the processing was interrupted.
See also ReceivedMessage#modify_ack_deadline!.
-
new_deadline (Integer) — The new ack deadline in seconds from the
time this request is sent to the Pub/Sub system. Must be >= 0. For
example, if the value is
10
, the new ack deadline will expire 10 seconds after the call is made. Specifying0
may immediately make the message available for another pull request. - messages (ReceivedMessage, String) — One or more ReceivedMessage objects or ack_id values.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" received_messages = sub.pull immediate: false sub.modify_ack_deadline 120, received_messages
#name
def name() -> String
The name of the subscription.
-
(String) — A fully-qualified subscription name in the form
projects/{project_id}/subscriptions/{subscription_id}
.
#new_snapshot
def new_snapshot(snapshot_name = nil, labels: nil) -> Google::Cloud::PubSub::Snapshot
Creates a new Snapshot from the subscription. The created snapshot is guaranteed to retain:
- The existing backlog on the subscription. More precisely, this is
defined as the messages in the subscription's backlog that are
unacknowledged upon the successful completion of the
create_snapshot
operation; as well as: - Any messages published to the subscription's topic following the
successful completion of the
create_snapshot
operation.
-
snapshot_name (String, nil) — Name of the new snapshot. Optional.
If the name is not provided, the server will assign a random name
for this snapshot on the same project as the subscription.
The value can be a simple snapshot ID (relative name), in which
case the current project ID will be supplied, or a fully-qualified
snapshot name in the form
projects/{project_id}/snapshots/{snapshot_id}
.The snapshot ID (relative name) must start with a letter, and contain only letters (
[A-Za-z]
), numbers ([0-9]
), dashes (-
), underscores (_
), periods (.
), tildes (~
), plus (+
) or percent signs (%
). It must be between 3 and 255 characters in length, and it must not start withgoog
. - labels (Hash) (defaults to: nil) — A hash of user-provided labels associated with the snapshot. You can use these to organize and group your snapshots. Label keys and values can be no longer than 63 characters, can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. Label values are optional. Label keys must start with a letter and each label in the list must have a different key. See Creating and Managing Labels.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-sub" snapshot = sub.create_snapshot "my-snapshot" snapshot.name #=> "projects/my-project/snapshots/my-snapshot"
Without providing a name:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-sub" snapshot = sub.create_snapshot snapshot.name #=> "projects/my-project/snapshots/gcr-analysis-..."
#policy
def policy() { |policy| ... } -> Policy
Gets the Cloud IAM access control policy for this subscription.
- (policy) — A block for updating the policy. The latest policy will be read from the Pub/Sub service and passed to the block. After the block completes, the modified policy will be written to the service.
- policy (Policy) — the current Cloud IAM Policy for this subscription
- (Policy) — the current Cloud IAM Policy for this subscription
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-subscription" policy = sub.policy
Update the policy by passing a block:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-subscription" sub.policy do |p| p.add "roles/owner", "user:owner@example.com" end
#policy=
def policy=(new_policy) -> Policy
Updates the Cloud IAM access control
policy for this subscription. The policy should be read from
#policy. See Policy for an explanation of
the policy etag
property and how to modify policies.
You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.
- new_policy (Policy) — a new or modified Cloud IAM Policy for this subscription
- (Policy) — the policy returned by the API update operation
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-subscription" policy = sub.policy # API call policy.add "roles/owner", "user:owner@example.com" sub.update_policy policy # API call
#pull
def pull(immediate: true, max: 100) -> Array<Google::Cloud::PubSub::ReceivedMessage>
Pulls messages from the server, blocking until messages are available
when called with the immediate: false
option, which is recommended
to avoid adverse impacts on the performance of pull operations.
Raises an API error with status UNAVAILABLE
if there are too many
concurrent pull requests pending for the given subscription.
See also #listen for the preferred way to process messages as they become available.
-
immediate (Boolean) (defaults to: true) — Whether to return immediately or block until
messages are available.
Warning: The default value of this field is
true
. However, sendingtrue
is discouraged because it adversely impacts the performance of pull operations. We recommend that users always explicitly set this field tofalse
.If this field set to
true
, the system will respond immediately even if it there are no messages available to return in the pull response. Otherwise, the system may wait (for a bounded amount of time) until at least one message is available, rather than returning no messages.See also #listen for the preferred way to process messages as they become available.
-
max (Integer) (defaults to: 100) — The maximum number of messages to return for this
request. The Pub/Sub system may return fewer than the number
specified. The default value is
100
, the maximum value is1000
.
The immediate: false
option is now recommended to avoid adverse impacts on pull operations:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" received_messages = sub.pull immediate: false received_messages.each do |received_message| received_message.acknowledge! end
A maximum number of messages returned can also be specified:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" received_messages = sub.pull immediate: false, max: 10 received_messages.each do |received_message| received_message.acknowledge! end
#push_config
def push_config() { |push_config| ... } -> Subscription::PushConfig
Inspect the Subscription's push configuration settings. The configuration can be changed by modifying the values in the method's block.
Subscription objects that are reference only will return an empty PushConfig object, which can be configured and saved using the method's block. Unlike #endpoint, which will retrieve the full resource from the API before returning. To get the actual values for a reference object, call #reload! before calling #push_config.
- (push_config) — a block for modifying the push configuration
- push_config (Subscription::PushConfig) — the push configuration
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" sub.push_config.endpoint #=> "http://example.com/callback" sub.push_config.authentication.email #=> "user@example.com" sub.push_config.authentication.audience #=> "client-12345"
Update the push configuration by passing a block:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-subscription" sub.push_config do |pc| pc.endpoint = "http://example.net/callback" pc.set_oidc_token "user@example.net", "client-67890" end
#reference?
def reference?() -> Boolean
Determines whether the subscription object was created without retrieving the resource representation from the Pub/Sub service.
-
(Boolean) —
true
when the subscription was created without a resource representation,false
otherwise.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.get_subscription "my-topic-sub", skip_lookup: true sub.reference? #=> true
#refresh!
def refresh!() -> Google::Cloud::PubSub::Subscription
Reloads the subscription with current data from the Pub/Sub service.
- (Google::Cloud::PubSub::Subscription) — Returns the reloaded subscription
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.get_subscription "my-topic-sub" sub.reload!
#reload!
def reload!() -> Google::Cloud::PubSub::Subscription
Reloads the subscription with current data from the Pub/Sub service.
- (Google::Cloud::PubSub::Subscription) — Returns the reloaded subscription
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.get_subscription "my-topic-sub" sub.reload!
#remove_dead_letter_policy
def remove_dead_letter_policy() -> Boolean
Removes an existing dead letter policy. A dead letter policy specifies the conditions for dead lettering messages in the subscription. If a dead letter policy is not set, dead lettering is disabled.
Makes an API call to retrieve the dead_letter_policy when called on a reference object. See #reference?.
See #dead_letter_topic, #dead_letter_topic=, #dead_letter_max_delivery_attempts and #dead_letter_max_delivery_attempts=.
-
(Boolean) —
true
if an existing dead letter policy was removed,false
if no existing dead letter policy was present.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" sub.dead_letter_topic.name #=> "projects/my-project/topics/my-dead-letter-topic" sub.dead_letter_max_delivery_attempts #=> 10 sub.remove_dead_letter_policy sub.dead_letter_topic #=> nil sub.dead_letter_max_delivery_attempts #=> nil
#resource?
def resource?() -> Boolean
Determines whether the subscription object was created with a resource representation from the Pub/Sub service.
-
(Boolean) —
true
when the subscription was created with a resource representation,false
otherwise.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.get_subscription "my-topic-sub" sub.resource? #=> true
#retain_acked
def retain_acked() -> Boolean
Indicates whether to retain acknowledged messages. If true
, then
messages are not expunged from the subscription's backlog, even if
they are acknowledged, until they fall out of the #retention window.
Default is false
.
Makes an API call to retrieve the retain_acked value when called on a reference object. See #reference?.
-
(Boolean) — Returns
true
if acknowledged messages are retained.
#retain_acked=
def retain_acked=(new_retain_acked)
Sets whether to retain acknowledged messages.
- new_retain_acked (Boolean) — The new retain acknowledged messages value.
#retention
def retention() -> Numeric
How long to retain unacknowledged messages in the subscription's
backlog, from the moment a message is published. If
#retain_acked is true
, then this also configures the retention of
acknowledged messages, and thus configures how far back in time a
#seek can be done. Cannot be less than 600 (10 minutes) or more
than 604,800 (7 days). Default is 604,800 seconds (7 days).
Makes an API call to retrieve the retention value when called on a reference object. See #reference?.
- (Numeric) — The message retention duration in seconds.
#retention=
def retention=(new_retention)
Sets the message retention duration in seconds.
- new_retention (Numeric) — The new retention value.
#retry_policy
def retry_policy() -> RetryPolicy, nil
A policy that specifies how Cloud Pub/Sub retries message delivery for this subscription. If nil
, the
default retry policy is applied. This generally implies that messages will be retried as soon as possible
for healthy subscribers. Retry Policy will be triggered on NACKs or acknowledgement deadline exceeded events
for a given message.
Makes an API call to retrieve the retry_policy when called on a reference object. See #reference?.
-
(RetryPolicy, nil) — The retry policy for the subscription, or
nil
.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" sub.retry_policy = Google::Cloud::PubSub::RetryPolicy.new minimum_backoff: 5, maximum_backoff: 300 sub.retry_policy.minimum_backoff #=> 5 sub.retry_policy.maximum_backoff #=> 300
#retry_policy=
def retry_policy=(new_retry_policy)
Sets a policy that specifies how Cloud Pub/Sub retries message delivery for this subscription. If nil
, the
default retry policy is applied. This generally implies that messages will be retried as soon as possible
for healthy subscribers. Retry Policy will be triggered on NACKs or acknowledgement deadline exceeded events
for a given message.
-
new_retry_policy (RetryPolicy, nil) — A new retry policy for the subscription, or
nil
.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" sub.retry_policy = Google::Cloud::PubSub::RetryPolicy.new minimum_backoff: 5, maximum_backoff: 300 sub.retry_policy.minimum_backoff #=> 5 sub.retry_policy.maximum_backoff #=> 300
#seek
def seek(snapshot) -> Boolean
Resets the subscription's backlog to a given Snapshot or to a point in time, whichever is provided in the request.
-
snapshot (Snapshot, String, Time) — The
Snapshot
instance, snapshot name, or time to which to perform the seek. If the argument is a snapshot, the snapshot's topic must be the same as that of the subscription. If it is a time, messages retained in the subscription that were published before this time are marked as acknowledged, and messages retained in the subscription that were published after this time are marked as unacknowledged. Note that this operation affects only those messages retained in the subscription. For example, if the time corresponds to a point before the message retention window (or to a point before the system's notion of the subscription creation time), only retained messages will be marked as unacknowledged, and already-expunged messages will not be restored.
-
(Boolean) — Returns
true
if the seek was successful.
Using a snapshot
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-sub" snapshot = sub.create_snapshot received_messages = sub.pull immediate: false sub.acknowledge received_messages sub.seek snapshot
Using a time:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-sub" time = Time.now received_messages = sub.pull immediate: false sub.acknowledge received_messages sub.seek time
#test_permissions
def test_permissions(*permissions) -> Array<String>
Tests the specified permissions against the Cloud IAM access control policy.
-
permissions (String, Array<String>) —
The set of permissions to check access for. Permissions with wildcards (such as
*
orstorage.*
) are not allowed.The permissions that can be checked on a subscription are:
- pubsub.subscriptions.consume
- pubsub.subscriptions.get
- pubsub.subscriptions.delete
- pubsub.subscriptions.update
- pubsub.subscriptions.getIamPolicy
- pubsub.subscriptions.setIamPolicy
- (Array<String>) — The permissions that have access.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-subscription" perms = sub.test_permissions "pubsub.subscriptions.get", "pubsub.subscriptions.consume" perms.include? "pubsub.subscriptions.get" #=> true perms.include? "pubsub.subscriptions.consume" #=> false
#topic
def topic() -> Topic
The Topic from which this subscription receives messages.
Makes an API call to retrieve the topic information when called on a reference object. See #reference?.
- (Topic)
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" sub.topic.name #=> "projects/my-project/topics/my-topic"
#topic_retention
def topic_retention() -> Numeric, nil
Indicates the minimum duration for which a message is retained after
it is published to the subscription's topic. If this field is set,
messages published to the subscription's topic in the last
topic_message_retention_duration
are always available to subscribers.
Output only. See Topic#retention.
Makes an API call to retrieve the retention value when called on a reference object. See #reference?.
-
(Numeric, nil) — The topic message retention duration in seconds,
or
nil
if not set.
#update_policy
def update_policy(new_policy) -> Policy
Updates the Cloud IAM access control
policy for this subscription. The policy should be read from
#policy. See Policy for an explanation of
the policy etag
property and how to modify policies.
You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.
- new_policy (Policy) — a new or modified Cloud IAM Policy for this subscription
- (Policy) — the policy returned by the API update operation
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-subscription" policy = sub.policy # API call policy.add "roles/owner", "user:owner@example.com" sub.update_policy policy # API call
#wait_for_messages
def wait_for_messages(max: 100) -> Array<Google::Cloud::PubSub::ReceivedMessage>
Pulls from the server while waiting for messages to become available. This is the same as:
subscription.pull immediate: false
See also #listen for the preferred way to process messages as they become available.
-
max (Integer) (defaults to: 100) — The maximum number of messages to return for this
request. The Pub/Sub system may return fewer than the number
specified. The default value is
100
, the maximum value is1000
.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" received_messages = sub.wait_for_messages received_messages.each do |received_message| received_message.acknowledge! end