处理消息失败问题

订阅者可能会因各种原因而无法处理消息。对于 例如,在检索处理数据所需的数据时,可能会出现暂时性问题。 消息。或者,消息采用订阅者意料之外的格式。

本页面介绍了如何使用 订阅重试政策,或者将未传送的消息转发到死信主题 (也称为死信队列)。

请注意,Dataflow 不支持这些功能。如需了解详情,请参阅 Dataflow 文档的不支持的 Pub/Sub 功能部分。

订阅重试政策

如果 Pub/Sub 尝试传送消息,但订阅者无法传送 确认后,Pub/Sub 会自动尝试重新发送消息。 这种重新提交尝试称为订阅重试政策。这不是 一项可开启或关闭的功能不过,您可以选择 政策。

首次创建和配置订阅时,您可以选择使用一项 以下重试政策之一:立即重新提交指数退避算法。 默认情况下,订阅会立即重新提交。

立即重新提交

默认情况下,Pub/Sub 会尝试立即重新发送该消息(并且 也可能发送到同一订阅者客户端)。然而,如果阻止 确认消息尚未更改,立即重新提交可能会导致问题。 在这种情况下,Pub/Sub 可能会重新发送多条消息 包含无法识别的内容

为了解决立即重新提交的问题,Pub/Sub 允许您配置 指数退避算法政策。

指数退避算法

指数退避算法可让你在重试之间的延迟时间逐渐变长 错误。第一次传送失败后,Pub/Sub 会等待 重试之前必须等待的最短退避时间。对于每一条连续失败的消息, 增加的时间,最多为最大延迟(0 和 600 秒)。

最大和最小延迟间隔不固定,应根据应用的本地因素进行配置。

请注意以下有关指数退避算法的注意事项:

  • 执行以下操作会触发指数退避算法:
    • 收到否定确认时。
    • 消息的确认截止期限到期。
  • 指数退避算法仅适用于每条消息, 订阅(全球)中的消息。
  • 在使用指数退避算法的情况下,Pub/Sub 仍会 其他消息,即使之前的消息收到了负面确认 (除非您使用的是有序邮件递送)。

使用重试政策延迟部分内容的传送和处理 消息,以应对发生暂时无法处理某些消息的情况 。系统会尽最大努力使用此功能,并且每条消息 重试政策。

我们不建议使用此功能故意延迟 消息传送。如果您发现大量负面反馈, 则有可能 部分消息在传送时没有退避或没有退避。 如果出现以下情况,Pub/Sub 还可能会减慢所有消息的传送速度: 大量消息

如果您需要安排送货时间,可以考虑使用 Cloud Tasks

配置指数退避算法

控制台

创建新订阅时,您可以执行以下步骤来配置指数退避算法重试政策:

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 订阅页面。

    前往订阅

  2. 点击创建订阅

  3. 订阅 ID 字段中,输入名称。

    如需了解如何命名订阅,请参阅主题或订阅命名准则

  4. 从下拉菜单中选择或创建一个主题。

    订阅将接收来自该主题的消息。

  5. 选择投放类型

  6. 重试政策下,选择使用指数退避算法后重试

  7. 输入退避时长下限退避时长上限(介于 0 到 600 秒之间)。

    最短退避时间的默认值为 10 秒,最长退避时间的默认值为 600 秒。

  8. 点击创建

gcloud

如需使用指数退避算法重试政策创建新订阅,请使用下面显示的标志运行 gcloud pubsub create 命令:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --min-retry-delay=MIN_RETRY_DELAY \
  --max-retry-delay=MAX_RETRY_DELAY

死信主题

如果 Pub/Sub 服务尝试传送消息,但 则 Pub/Sub 可以转发 无法传送的消息。

死信主题如何与 Pub/Sub 搭配使用

死信主题是一种订阅属性, 不是主题属性。这意味着您在创建死信主题时 订阅,而不是在创建主题时启用。

如果您创建死信主题,则可以设置以下内容 订阅属性:

  • 传送尝试次数上限:一个数值,表示 Pub/Sub 的 代表特定消息。如果订阅方客户端无法确认 在指定的传送尝试次数内发送,则视为 转发至死信主题。

    • 默认值 = 5
    • 最大值 = 100
    • 最小值 = 5
  • Project with the Dead-letter topic:如果死信主题位于 项目与订阅不同,则必须使用 死信主题将死信主题设为与以下主题不同的主题: 关联了该订阅。

如何计算发送尝试次数上限

当死信主题出现以下情况时,Pub/Sub 只会计算传送尝试次数 配置正确,且包含正确的 IAM 权限

传送尝试次数上限是近似值,因为 Pub/Sub 会尽可能转发无法传送的消息。

系统跟踪的邮件递送尝试次数也可能会重置为零, 特别是对于具有非活跃订阅者的拉取订阅。 因此,消息可能会传送到订阅者客户端 次。

配置死信主题

如需配置死信主题,源主题必须先拥有订阅。 您可以在创建订阅时指定死信主题,也可以 更新现有订阅以包含死信主题。

以下是在订阅上启用死信的工作流程。

  1. 创建死信主题。该主题与源主题无关。

  2. 在源主题的订阅上设置死信主题。

  3. 为避免丢失死信主题中的邮件,请至少附加另一个 订阅死信主题。辅助订阅会接收消息 死信主题

  4. 为 Pub/Sub 授予 Publisher 和 Subscriber 角色 服务账号。如需了解详情,请参阅 授予转发权限

为新订阅设置死信主题

您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API。

控制台

如需创建订阅并设置死信主题,请完成以下步骤:

  1. 在 Google Cloud 控制台中,进入订阅页面。

    前往订阅页面

  2. 点击创建订阅

  3. 输入订阅 ID

  4. 从下拉菜单中选择或创建一个主题。

    订阅将接收来自该主题的消息。

  5. 死信部分,选择启用死信

  6. 从下拉菜单中选择或创建死信主题。

  7. 如果选定的死信主题没有订阅,系统会提示 创建一个。

  8. 传送尝试次数上限字段中,指定一个 5 到 100 之间的整数。

  9. 点击创建

  10. 详细信息面板会显示可用操作项的列表。如果存在 个项目显示错误图标 ,点击 解决相应问题的操作项。

gcloud

如需创建订阅并设置死信主题,请使用 gcloud pubsub subscriptions create 命令:

gcloud pubsub subscriptions create subscription-id \
  --topic=topic-id \
  --dead-letter-topic=dead-letter-topic-name \
  [--max-delivery-attempts=max-delivery-attempts] \
  [--dead-letter-topic-project=dead-letter-topic-project]

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id,
   std::string const& dead_letter_topic_id,
   int dead_letter_delivery_attempts) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_dead_letter_policy()->set_dead_letter_topic(
      pubsub::Topic(project_id, dead_letter_topic_id).FullName());
  request.mutable_dead_letter_policy()->set_max_delivery_attempts(
      dead_letter_delivery_attempts);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";

  std::cout << "It will forward dead letter messages to: "
            << sub->dead_letter_policy().dead_letter_topic() << "\n";

  std::cout << "After " << sub->dead_letter_policy().max_delivery_attempts()
            << " delivery attempts.\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;
using System;

public class CreateSubscriptionWithDeadLetterPolicySample
{
    public Subscription CreateSubscriptionWithDeadLetterPolicy(string projectId, string topicId, string subscriptionId, string deadLetterTopicId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        // This is the subscription you want to create with a dead letter policy.
        var subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        // This is an existing topic that you want to attach the subscription with dead letter policy to.
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        // This is an existing topic that the subscription with dead letter policy forwards dead letter messages to.
        var deadLetterTopic = TopicName.FromProjectTopic(projectId, deadLetterTopicId).ToString();
        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            DeadLetterPolicy = new DeadLetterPolicy
            {
                DeadLetterTopic = deadLetterTopic,
                // The maximum number of times that the service attempts to deliver a
                // message before forwarding it to the dead letter topic. Must be [5-100].
                MaxDeliveryAttempts = 10
            },
            AckDeadlineSeconds = 30
        };

        var subscription = subscriber.CreateSubscription(subscriptionRequest);
        Console.WriteLine("Created subscription: " + subscription.SubscriptionName.SubscriptionId);
        Console.WriteLine($"It will forward dead letter messages to: {subscription.DeadLetterPolicy.DeadLetterTopic}");
        Console.WriteLine($"After {subscription.DeadLetterPolicy.MaxDeliveryAttempts} delivery attempts.");
        // Remember to attach a subscription to the dead letter topic because
        // messages published to a topic with no subscriptions are lost.
        return subscription;
    }
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

// createSubWithDeadLetter creates a subscription with a dead letter policy.
func createSubWithDeadLetter(w io.Writer, projectID, subID string, topicID string, fullyQualifiedDeadLetterTopic string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topicID := "my-topic"
	// fullyQualifiedDeadLetterTopic := "projects/my-project/topics/my-dead-letter-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	topic := client.Topic(topicID)

	subConfig := pubsub.SubscriptionConfig{
		Topic:       topic,
		AckDeadline: 20 * time.Second,
		DeadLetterPolicy: &pubsub.DeadLetterPolicy{
			DeadLetterTopic:     fullyQualifiedDeadLetterTopic,
			MaxDeliveryAttempts: 10,
		},
	}

	sub, err := client.CreateSubscription(ctx, subID, subConfig)
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription (%s) with dead letter topic (%s)\n", sub.String(), fullyQualifiedDeadLetterTopic)
	fmt.Fprintln(w, "To process dead letter messages, remember to add a subscription to your dead letter topic.")
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.DeadLetterPolicy;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithDeadLetterPolicyExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is the subscription you want to create with a dead letter policy.
    String subscriptionId = "your-subscription-id";
    // This is an existing topic that you want to attach the subscription with dead letter policy
    // to.
    String topicId = "your-topic-id";
    // This is an existing topic that the subscription with dead letter policy forwards dead letter
    // messages to.
    String deadLetterTopicId = "your-dead-letter-topic-id";

    CreateSubscriptionWithDeadLetterPolicyExample.createSubscriptionWithDeadLetterPolicyExample(
        projectId, subscriptionId, topicId, deadLetterTopicId);
  }

  public static void createSubscriptionWithDeadLetterPolicyExample(
      String projectId, String subscriptionId, String topicId, String deadLetterTopicId)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);
      ProjectTopicName deadLetterTopicName = ProjectTopicName.of(projectId, deadLetterTopicId);

      DeadLetterPolicy deadLetterPolicy =
          DeadLetterPolicy.newBuilder()
              .setDeadLetterTopic(deadLetterTopicName.toString())
              // The maximum number of times that the service attempts to deliver a
              // message before forwarding it to the dead letter topic. Must be [5-100].
              .setMaxDeliveryAttempts(10)
              .build();

      Subscription request =
          Subscription.newBuilder()
              .setName(subscriptionName.toString())
              .setTopic(topicName.toString())
              .setDeadLetterPolicy(deadLetterPolicy)
              .build();

      Subscription subscription = subscriptionAdminClient.createSubscription(request);

      System.out.println("Created subscription: " + subscription.getName());
      System.out.println(
          "It will forward dead letter messages to: "
              + subscription.getDeadLetterPolicy().getDeadLetterTopic());
      System.out.println(
          "After "
              + subscription.getDeadLetterPolicy().getMaxDeliveryAttempts()
              + " delivery attempts.");
      // Remember to attach a subscription to the dead letter topic because
      // messages published to a topic with no subscriptions are lost.
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const deadLetterTopicNameOrId = 'YOUR_DEAD_LETTER_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithDeadLetterPolicy(
  topicNameOrId,
  subscriptionNameOrId,
  deadLetterTopicNameOrId
) {
  // Creates a new subscription
  const options = {
    deadLetterPolicy: {
      deadLetterTopic: pubSubClient.topic(deadLetterTopicNameOrId).name,
      maxDeliveryAttempts: 10,
    },
  };
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);
  console.log(
    `Created subscription ${subscriptionNameOrId} with dead letter topic ${deadLetterTopicNameOrId}.`
  );
  console.log(
    'To process dead letter messages, remember to add a subscription to your dead letter topic.'
  );
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const deadLetterTopicNameOrId = 'YOUR_DEAD_LETTER_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub, CreateSubscriptionOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithDeadLetterPolicy(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  deadLetterTopicNameOrId: string
) {
  // Creates a new subscription
  const options: CreateSubscriptionOptions = {
    deadLetterPolicy: {
      deadLetterTopic: pubSubClient.topic(deadLetterTopicNameOrId).name,
      maxDeliveryAttempts: 10,
    },
  };
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);
  console.log(
    `Created subscription ${subscriptionNameOrId} with dead letter topic ${deadLetterTopicNameOrId}.`
  );
  console.log(
    'To process dead letter messages, remember to add a subscription to your dead letter topic.'
  );
}

PHP

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription with dead letter policy enabled.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $deadLetterTopicName The Pub/Sub topic to use for dead letter policy.
 */
function dead_letter_create_subscription($projectId, $topicName, $subscriptionName, $deadLetterTopicName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);
    $deadLetterTopic = $pubsub->topic($deadLetterTopicName);

    $subscription = $topic->subscribe($subscriptionName, [
        'deadLetterPolicy' => [
            'deadLetterTopic' => $deadLetterTopic
        ]
    ]);

    printf(
        'Subscription %s created with dead letter topic %s' . PHP_EOL,
        $subscription->name(),
        $deadLetterTopic->name()
    );
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import DeadLetterPolicy

# TODO(developer)
# project_id = "your-project-id"
# endpoint = "https://my-test-project.appspot.com/push"
# TODO(developer): This is an existing topic that the subscription
# with dead letter policy is attached to.
# topic_id = "your-topic-id"
# TODO(developer): This is an existing subscription with a dead letter policy.
# subscription_id = "your-subscription-id"
# TODO(developer): This is an existing dead letter topic that the subscription
# with dead letter policy will forward dead letter messages to.
# dead_letter_topic_id = "your-dead-letter-topic-id"
# TODO(developer): This is the maximum number of delivery attempts allowed
# for a message before it gets delivered to a dead letter topic.
# max_delivery_attempts = 5

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)
dead_letter_topic_path = publisher.topic_path(project_id, dead_letter_topic_id)

dead_letter_policy = DeadLetterPolicy(
    dead_letter_topic=dead_letter_topic_path,
    max_delivery_attempts=max_delivery_attempts,
)

with subscriber:
    request = {
        "name": subscription_path,
        "topic": topic_path,
        "dead_letter_policy": dead_letter_policy,
    }
    subscription = subscriber.create_subscription(request)

print(f"Subscription created: {subscription.name}")
print(
    f"It will forward dead letter messages to: {subscription.dead_letter_policy.dead_letter_topic}."
)
print(
    f"After {subscription.dead_letter_policy.max_delivery_attempts} delivery attempts."
)

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

Ruby

在尝试此示例之前,请按照Ruby Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub Ruby API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

# topic_id             = "your-topic-id"
# subscription_id      = "your-subscription-id"
# dead_letter_topic_id = "your-dead-letter-topic-id"

pubsub = Google::Cloud::Pubsub.new

topic             = pubsub.topic topic_id
dead_letter_topic = pubsub.topic dead_letter_topic_id
subscription      = topic.subscribe subscription_id,
                                    dead_letter_topic:                 dead_letter_topic,
                                    dead_letter_max_delivery_attempts: 10

puts "Created subscription #{subscription_id} with dead letter topic #{dead_letter_topic_id}."
puts "To process dead letter messages, remember to add a subscription to your dead letter topic."

为现有订阅设置死信主题

您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API。

控制台

如需更新订阅并设置死信主题,请完成以下步骤。

  1. 在 Google Cloud 控制台中,进入订阅页面。

    前往订阅页面

  2. 在要更新的订阅旁边,点击更多操作

  3. 在上下文菜单中,选择编辑

    突出显示“修改”选项的上下文菜单。

  4. 死信部分,选择启用死信

  5. 从下拉菜单中选择或创建一个主题。

  6. 如果所选主题没有订阅,系统会提示您创建一个订阅。

  7. 传送尝试次数上限字段中,指定 5 到 100 之间的整数。

  8. 点击更新

  9. 详细信息面板会显示可用操作项的列表。如果存在 个项目显示错误图标 ,点击 解决相应问题的待办项。

gcloud

如需更新订阅并设置死信主题,请使用 gcloud pubsub subscriptions update 命令:

gcloud pubsub subscriptions update subscription-id \
  --dead-letter-topic=dead-letter-topic-name \
  [--max-delivery-attempts=max-delivery-attempts] \
  [--dead-letter-topic-project=dead-letter-topic-project]

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub_admin = ::google::cloud::pubsub_admin;
namespace pubsub = ::google::cloud::pubsub;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& subscription_id,
   std::string const& dead_letter_topic_id,
   int dead_letter_delivery_attempts) {
  google::pubsub::v1::UpdateSubscriptionRequest request;
  request.mutable_subscription()->set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.mutable_subscription()
      ->mutable_dead_letter_policy()
      ->set_dead_letter_topic(
          pubsub::Topic(project_id, dead_letter_topic_id).FullName());
  request.mutable_subscription()
      ->mutable_dead_letter_policy()
      ->set_max_delivery_attempts(dead_letter_delivery_attempts);
  *request.mutable_update_mask()->add_paths() = "dead_letter_policy";
  auto sub = client.UpdateSubscription(request);
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription has been updated to: " << sub->DebugString()
            << "\n";

  std::cout << "It will forward dead letter messages to: "
            << sub->dead_letter_policy().dead_letter_topic() << "\n";

  std::cout << "After " << sub->dead_letter_policy().max_delivery_attempts()
            << " delivery attempts.\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;
using Google.Protobuf.WellKnownTypes;

public class UpdateDeadLetterPolicySample
{
    public Subscription UpdateDeadLetterPolicy(string projectId, string topicId, string subscriptionId, string deadLetterTopicId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        // This is an existing topic that the subscription with dead letter policy is attached to.
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        // This is an existing subscription with a dead letter policy.
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        // This is an existing dead letter topic that the subscription with dead letter policy forwards
        // dead letter messages to.
        var deadLetterTopic = TopicName.FromProjectTopic(projectId, deadLetterTopicId).ToString();


        // Construct the subscription with the dead letter policy you expect to have after the update.
        // Here, values in the required fields (name, topic) help identify the subscription.
        var subscription = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            DeadLetterPolicy = new DeadLetterPolicy
            {
                DeadLetterTopic = deadLetterTopic,
                MaxDeliveryAttempts = 20,
            }
        };

        var request = new UpdateSubscriptionRequest
        {
            Subscription = subscription,
            // Construct a field mask to indicate which field to update in the subscription.
            UpdateMask = new FieldMask { Paths = { "dead_letter_policy" } }
        };
        var updatedSubscription = subscriber.UpdateSubscription(request);
        return updatedSubscription;
    }
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

// updateDeadLetter updates an existing subscription with a dead letter policy.
func updateDeadLetter(w io.Writer, projectID, subID string, fullyQualifiedDeadLetterTopic string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// fullyQualifiedDeadLetterTopic := "projects/my-project/topics/my-dead-letter-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	updateConfig := pubsub.SubscriptionConfigToUpdate{
		DeadLetterPolicy: &pubsub.DeadLetterPolicy{
			DeadLetterTopic:     fullyQualifiedDeadLetterTopic,
			MaxDeliveryAttempts: 20,
		},
	}

	subConfig, err := client.Subscription(subID).Update(ctx, updateConfig)
	if err != nil {
		return fmt.Errorf("Update: %w", err)
	}
	fmt.Fprintf(w, "Updated subscription config: %+v\n", subConfig)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.DeadLetterPolicy;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateSubscriptionRequest;
import java.io.IOException;

public class UpdateDeadLetterPolicyExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is an existing subscription with a dead letter policy.
    String subscriptionId = "your-subscription-id";
    // This is an existing topic that the subscription with dead letter policy is attached to.
    String topicId = "your-topic-id";
    // This is an existing dead letter topic that the subscription with dead letter policy forwards
    // dead letter messages to.
    String deadLetterTopicId = "your-dead-letter-topic-id";

    UpdateDeadLetterPolicyExample.updateDeadLetterPolicyExample(
        projectId, subscriptionId, topicId, deadLetterTopicId);
  }

  public static void updateDeadLetterPolicyExample(
      String projectId, String subscriptionId, String topicId, String deadLetterTopicId)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);

      System.out.println(
          "Before: " + subscriptionAdminClient.getSubscription(subscriptionName).getAllFields());

      TopicName topicName = TopicName.of(projectId, topicId);
      TopicName deadLetterTopicName = TopicName.of(projectId, deadLetterTopicId);

      // Construct the dead letter policy you expect to have after the update.
      DeadLetterPolicy deadLetterPolicy =
          DeadLetterPolicy.newBuilder()
              .setDeadLetterTopic(deadLetterTopicName.toString())
              .setMaxDeliveryAttempts(20)
              .build();

      // Construct the subscription with the dead letter policy you expect to have
      // after the update. Here, values in the required fields (name, topic) help
      // identify the subscription.
      Subscription subscription =
          Subscription.newBuilder()
              .setName(subscriptionName.toString())
              .setTopic(topicName.toString())
              .setDeadLetterPolicy(deadLetterPolicy)
              .build();

      // Construct a field mask to indicate which field to update in the subscription.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("dead_letter_policy.max_delivery_attempts").build();

      UpdateSubscriptionRequest request =
          UpdateSubscriptionRequest.newBuilder()
              .setSubscription(subscription)
              .setUpdateMask(updateMask)
              .build();

      Subscription response = subscriptionAdminClient.updateSubscription(request);

      System.out.println("After: " + response.getAllFields());
      System.out.println(
          "Max delivery attempts is now "
              + response.getDeadLetterPolicy().getMaxDeliveryAttempts());
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateDeadLetterPolicy(topicNameOrId, subscriptionNameOrId) {
  const metadata = {
    deadLetterPolicy: {
      deadLetterTopic: pubSubClient.topic(topicNameOrId).name,
      maxDeliveryAttempts: 15,
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .subscription(subscriptionNameOrId)
    .setMetadata(metadata);

  console.log('Max delivery attempts updated successfully.');
}

PHP

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

use Google\Cloud\PubSub\PubSubClient;

/**
 * Set the dead letter policy on an existing subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $deadLetterTopicName The Pub/Sub topic to use for dead letter policy.
 */
function dead_letter_update_subscription(
    string $projectId,
    string $topicName,
    string $subscriptionName,
    string $deadLetterTopicName
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);
    $deadLetterTopic = $pubsub->topic($deadLetterTopicName);

    $subscription = $topic->subscription($subscriptionName);
    $subscription->update([
        'deadLetterPolicy' => [
            'deadLetterTopic' => $deadLetterTopic
        ]
    ]);

    printf(
        'Subscription %s updated with dead letter topic %s' . PHP_EOL,
        $subscription->name(),
        $deadLetterTopic->name()
    );
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import DeadLetterPolicy, FieldMask

# TODO(developer)
# project_id = "your-project-id"
# TODO(developer): This is an existing topic that the subscription
# with dead letter policy is attached to.
# topic_id = "your-topic-id"
# TODO(developer): This is an existing subscription with a dead letter policy.
# subscription_id = "your-subscription-id"
# TODO(developer): This is an existing dead letter topic that the subscription
# with dead letter policy will forward dead letter messages to.
# dead_letter_topic_id = "your-dead-letter-topic-id"
# TODO(developer): This is the maximum number of delivery attempts allowed
# for a message before it gets delivered to a dead letter topic.
# max_delivery_attempts = 5

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)
dead_letter_topic_path = publisher.topic_path(project_id, dead_letter_topic_id)

subscription_before_update = subscriber.get_subscription(
    request={"subscription": subscription_path}
)
print(f"Before the update: {subscription_before_update}.")

# Indicates which fields in the provided subscription to update.
update_mask = FieldMask(paths=["dead_letter_policy"])

# Construct a dead letter policy you expect to have after the update.
dead_letter_policy = DeadLetterPolicy(
    dead_letter_topic=dead_letter_topic_path,
    max_delivery_attempts=max_delivery_attempts,
)

# Construct the subscription with the dead letter policy you expect to have
# after the update. Here, values in the required fields (name, topic) help
# identify the subscription.
subscription = pubsub_v1.types.Subscription(
    name=subscription_path,
    topic=topic_path,
    dead_letter_policy=dead_letter_policy,
)

with subscriber:
    subscription_after_update: gapic_types.Subscription = (
        subscriber.update_subscription(
            request={"subscription": subscription, "update_mask": update_mask}
        )
    )

print(f"After the update: {subscription_after_update}.")

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

Ruby

在尝试此示例之前,请按照Ruby Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub Ruby API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

# subscription_id       = "your-subscription-id"
# role                  = "roles/pubsub.publisher"
# service_account_email = "serviceAccount:account_name@project_name.iam.gserviceaccount.com"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
subscription.dead_letter_max_delivery_attempts = 20
puts "Max delivery attempts is now #{subscription.dead_letter_max_delivery_attempts}."

授予 IAM 角色以使用死信主题

如需将无法传送的消息转发到死信主题,Pub/Sub 必须具有执行以下操作的权限:

  • 将消息发布到主题。
  • 确认消息,这会将其从订阅中移除。

Pub/Sub 会为每个项目创建并维护一个服务账号:service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com。您可以通过分配 发布商和 订阅者角色

控制台

要授予 Pub/Sub 将消息发布到 死信主题,请完成以下步骤:

  1. 在 Google Cloud 控制台中,进入订阅页面。

    前往订阅页面

  2. 点击死信主题的订阅的名称。

  3. 点击死信标签页。

  4. 如需分配发布者角色,请点击授予发布者角色。如果 已成功分配发布商角色,您会看到蓝色对勾标记

  5. 如需分配订阅者角色,请点击授予订阅者角色。如果 已成功分配发布商角色,您会看到蓝色对勾标记

gcloud

如需授予 Pub/Sub 将消息发布到死信主题的权限,请运行以下命令:

PUBSUB_SERVICE_ACCOUNT="service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com"

gcloud pubsub topics add-iam-policy-binding dead-letter-topic-name \
  --member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\
  --role="roles/pubsub.publisher"

如需授予 Pub/Sub 确认转发的无法传送的消息的权限,请运行以下命令:

PUBSUB_SERVICE_ACCOUNT="service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com"

gcloud pubsub subscriptions add-iam-policy-binding subscription-id \
  --member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\
  --role="roles/pubsub.subscriber"

跟踪传送尝试

为订阅启用死信主题后,每一条消息 具有一个字段,用于指定 尝试发送时间:

  • 从拉取订阅接收的消息包括 delivery_attempt 字段。

  • 从推送订阅接收的消息包括 deliveryAttempt 字段。

以下示例展示了如何获取传送尝试次数:

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::cout << "Delivery attempt: " << h.delivery_attempt() << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
};

C#

在尝试此示例之前,请按照C# Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub C# API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


using Google.Cloud.PubSub.V1;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesAsyncWithDeliveryAttemptsSample
{
    public async Task<int> PullMessagesAsyncWithDeliveryAttempts(string projectId, string subscriptionId, bool acknowledge)
    {
        // This is an existing subscription with a dead letter policy.
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);

        int deliveryAttempt = 0;
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            string text = message.Data.ToStringUtf8();
            System.Console.WriteLine($"Delivery Attempt: {message.GetDeliveryAttempt()}");
            if (message.GetDeliveryAttempt() != null)
            {
                deliveryAttempt = message.GetDeliveryAttempt().Value;
            }
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 7 seconds.
        await Task.Delay(7000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return deliveryAttempt;
    }
}

Go

在尝试此示例之前,请按照Go Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub Go API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func pullMsgsDeadLetterDeliveryAttempt(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	sub := client.Subscription(subID)
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		// When dead lettering is enabled, the delivery attempt field is a pointer to the
		// the number of times the service has attempted to delivery a message.
		// Otherwise, the field is nil.
		if msg.DeliveryAttempt != nil {
			fmt.Fprintf(w, "message: %s, delivery attempts: %d", msg.Data, *msg.DeliveryAttempt)
		}
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("got error in Receive: %w", err)
	}
	return nil
}

Java

在尝试此示例之前,请按照Java Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub Java API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ReceiveMessagesWithDeliveryAttemptsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is an existing subscription with a dead letter policy.
    String subscriptionId = "your-subscription-id";

    ReceiveMessagesWithDeliveryAttemptsExample.receiveMessagesWithDeliveryAttemptsExample(
        projectId, subscriptionId);
  }

  public static void receiveMessagesWithDeliveryAttemptsExample(
      String projectId, String subscriptionId) {

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        new MessageReceiver() {
          @Override
          public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            // Handle incoming message, then ack the received message.
            System.out.println("Id: " + message.getMessageId());
            System.out.println("Data: " + message.getData().toStringUtf8());
            System.out.println("Delivery Attempt: " + Subscriber.getDeliveryAttempt(message));
            consumer.ack();
          }
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

在尝试此示例之前,请按照Node.js Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub Node.js API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();

async function synchronousPullWithDeliveryAttempts(
  projectId,
  subscriptionNameOrId
) {
  // The low level API client requires a name only.
  const formattedSubscription =
    subscriptionNameOrId.indexOf('/') >= 0
      ? subscriptionNameOrId
      : subClient.subscriptionPath(projectId, subscriptionNameOrId);

  // The maximum number of messages returned for this request.
  // Pub/Sub may return fewer than the number specified.
  const request = {
    subscription: formattedSubscription,
    maxMessages: 10,
  };

  // The subscriber pulls a specified number of messages.
  const [response] = await subClient.pull(request);

  // Process the messages.
  const ackIds = [];
  for (const message of response.receivedMessages || []) {
    console.log(`Received message: ${message.message.data}`);
    console.log(`Delivery Attempt: ${message.deliveryAttempt}`);
    if (message.ackId) {
      ackIds.push(message.ackId);
    }
  }

  // Acknowledge all of the messages. You could also acknowledge
  // these individually, but this is more efficient.
  const ackRequest = {
    subscription: formattedSubscription,
    ackIds: ackIds,
  };
  await subClient.acknowledge(ackRequest);

  console.log('Done.');
}

PHP

在尝试此示例之前,请按照PHP Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub PHP API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

use Google\Cloud\PubSub\Message;
use Google\Cloud\PubSub\PubSubClient;

/**
 * Get the delivery attempt from a pulled message.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $message The contents of a pubsub message data field.
 */
function dead_letter_delivery_attempt($projectId, $topicName, $subscriptionName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);

    // publish test message
    $topic->publish(new Message([
        'data' => $message
    ]));

    $subscription = $topic->subscription($subscriptionName);
    $messages = $subscription->pull();

    foreach ($messages as $message) {
        printf('Received message %s' . PHP_EOL, $message->data());
        printf('Delivery attempt %d' . PHP_EOL, $message->deliveryAttempt());
    }
    print('Done' . PHP_EOL);
}

Python

在尝试此示例之前,请按照Python Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub Python API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    print(f"With delivery attempts: {message.delivery_attempt}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    # When `timeout` is not set, result() will block indefinitely,
    # unless an exception is encountered first.
    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

在尝试此示例之前,请按照Ruby Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub Ruby API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
subscription.pull(immediate: false).each do |message|
  puts "Received message: #{message.data}"
  puts "Delivery Attempt: #{message.delivery_attempt}"
  message.acknowledge!
end

当 Pub/Sub 将无法传送的消息转发到死信主题时, 它会将以下属性添加到 消息:

  • CloudPubSubDeadLetterSourceDeliveryCount:传送尝试次数 来源订阅
  • CloudPubSubDeadLetterSourceSubscription:来源的名称 订阅。
  • CloudPubSubDeadLetterSourceSubscriptionProject:需要修改配置的项目的名称 包含源订阅。
  • CloudPubSubDeadLetterSourceTopicPublishTime:显示消息时的时间戳 最初发布。
  • CloudPubSubDeadLetterSourceDeliveryErrorMessage:消息的原因 无法传送到原始目的地。该属性只存在 导出订阅

监控转发的邮件

转发无法送达的消息后,Pub/Sub 服务会从订阅中移除消息。您可以使用 Cloud Monitoring。

如果您将订阅附加到死信主题,则这些消息会使用附加订阅的到期政策,而非具有死信主题属性的订阅的有效期。

subscription/dead_letter_message_count 指标用于记录 Pub/Sub 从订阅转发的无法传送的消息数量。

如需了解详情,请参阅监控转发的无法传送的消息

移除死信主题

如需停止转发无法传送的消息,请从订阅中移除死信主题。

您可以使用 Google Cloud 控制台、Google Cloud CLI 或 Pub/Sub API。

控制台

如需从订阅中移除死信主题,请完成以下步骤:

  1. 在 Google Cloud 控制台中,进入订阅页面。

    前往订阅页面

  2. 在订阅列表中,点击订阅旁边的 进行更新。

  3. 从上下文菜单中选择修改

    突出显示“修改”选项的上下文菜单。

  4. 死信部分,清除启用死信

  5. 点击更新

gcloud

如需从订阅中移除死信主题,请使用 gcloud pubsub subscriptions update 命令和 --clear-dead-letter-policy 标志:

gcloud pubsub subscriptions update subscription-id \
  --clear-dead-letter-policy

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub_admin = ::google::cloud::pubsub_admin;
namespace pubsub = ::google::cloud::pubsub;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& subscription_id) {
  google::pubsub::v1::UpdateSubscriptionRequest request;
  request.mutable_subscription()->set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.mutable_subscription()->clear_dead_letter_policy();
  *request.mutable_update_mask()->add_paths() = "dead_letter_policy";
  auto sub = client.UpdateSubscription(request);
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription has been updated to: " << sub->DebugString()
            << "\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;
using Google.Protobuf.WellKnownTypes;

public class RemoveDeadLetterPolicySample
{
    public Subscription RemoveDeadLetterPolicy(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        // This is an existing topic that the subscription with dead letter policy is attached to.
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        // This is an existing subscription with dead letter policy.
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscription = new Subscription()
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            DeadLetterPolicy = null
        };

        var request = new UpdateSubscriptionRequest
        {
            Subscription = subscription,
            UpdateMask = new FieldMask { Paths = { "dead_letter_policy" } }
        };
        var updatedSubscription = subscriber.UpdateSubscription(request);
        return updatedSubscription;
    }
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

// removeDeadLetterTopic removes the dead letter policy from a subscription.
func removeDeadLetterTopic(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	subConfig, err := client.Subscription(subID).Update(ctx, pubsub.SubscriptionConfigToUpdate{
		DeadLetterPolicy: &pubsub.DeadLetterPolicy{},
	})
	if err != nil {
		return fmt.Errorf("Update: %w", err)
	}
	fmt.Fprintf(w, "Updated subscription config: %+v\n", subConfig)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateSubscriptionRequest;

public class RemoveDeadLetterPolicyExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is an existing subscription with dead letter policy.
    String subscriptionId = "your-subscription-id";
    // This is an existing topic that the subscription with dead letter policy is attached to.
    String topicId = "your-topic-id";

    RemoveDeadLetterPolicyExample.removeDeadLetterPolicyExample(projectId, subscriptionId, topicId);
  }

  public static void removeDeadLetterPolicyExample(
      String projectId, String subscriptionId, String topicId) throws Exception {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);
      TopicName topicName = TopicName.of(projectId, topicId);

      // Construct the subscription you expect to have after the request. Here,
      // values in the required fields (name, topic) help identify the subscription.
      // No dead letter policy is supplied.
      Subscription expectedSubscription =
          Subscription.newBuilder()
              .setName(subscriptionName.toString())
              .setTopic(topicName.toString())
              .build();

      // Construct a field mask to indicate which field to update in the subscription.
      FieldMask updateMask = FieldMask.newBuilder().addPaths("dead_letter_policy").build();

      UpdateSubscriptionRequest request =
          UpdateSubscriptionRequest.newBuilder()
              .setSubscription(expectedSubscription)
              .setUpdateMask(updateMask)
              .build();

      Subscription response = subscriptionAdminClient.updateSubscription(request);

      // You should see an empty dead letter topic field inside the dead letter policy.
      System.out.println("After: " + response.getAllFields());
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function removeDeadLetterPolicy(topicNameOrId, subscriptionNameOrId) {
  const metadata = {
    deadLetterPolicy: null,
  };

  await pubSubClient
    .topic(topicNameOrId)
    .subscription(subscriptionNameOrId)
    .setMetadata(metadata);

  console.log(
    `Removed dead letter topic from ${subscriptionNameOrId} subscription.`
  );
}

PHP

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

use Google\Cloud\PubSub\PubSubClient;

/**
 * Remove dead letter policy from an existing subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function dead_letter_remove($projectId, $topicName, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);

    $subscription = $topic->subscription($subscriptionName);

    // Provide deadLetterPolicy in the update mask, but omit from update fields to unset.
    $subscription->update([], [
        'updateMask' => [
            'deadLetterPolicy'
        ]
    ]);

    printf(
        'Removed dead letter topic from subscription %s' . PHP_EOL,
        $subscription->name()
    );
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import FieldMask

# TODO(developer)
# project_id = "your-project-id"
# TODO(developer): This is an existing topic that the subscription
# with dead letter policy is attached to.
# topic_id = "your-topic-id"
# TODO(developer): This is an existing subscription with a dead letter policy.
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

subscription_before_update = subscriber.get_subscription(
    request={"subscription": subscription_path}
)
print(f"Before removing the policy: {subscription_before_update}.")

# Indicates which fields in the provided subscription to update.
update_mask = FieldMask(paths=["dead_letter_policy"])

# Construct the subscription (without any dead letter policy) that you
# expect to have after the update.
subscription = pubsub_v1.types.Subscription(
    name=subscription_path, topic=topic_path
)

with subscriber:
    subscription_after_update: gapic_types.Subscription = (
        subscriber.update_subscription(
            request={"subscription": subscription, "update_mask": update_mask}
        )
    )

print(f"After removing the policy: {subscription_after_update}.")

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

Ruby

在尝试此示例之前,请按照Ruby Pub/Sub 快速入门: 客户端库。 有关详情,请参阅 Pub/Sub Ruby API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
subscription.remove_dead_letter_policy
puts "Removed dead letter topic from #{subscription_id} subscription."

价格

Pub/Sub 服务转发无法传送的消息时,您需要支付以下费用:

  • 向与包含死信主题的项目相关联的结算账号计入的发布费用。
  • 向结算账号收取的出站邮件的订阅费用 与包含死信主题的订阅的项目相关联 属性。

如果您设置了订阅的死信主题属性,但 消息存储位置政策 死信主题主题不允许包含相应订阅的区域; 还需要支付出站消息的发布费用。

出站消息的发布费用将计入包含 死信主题要了解详情,请参阅价格

后续步骤