创建拉取订阅

本文档介绍了如何创建拉取订阅。您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API 创建拉取订阅。

准备工作

所需的角色和权限

如需创建订阅,您必须在项目级层配置访问权限控制。如果您的订阅和主题位于不同的项目中,您还需要拥有资源级权限,如本部分后面所述。

如需获得创建拉取订阅所需的权限,请让您的管理员为您授予项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建拉取订阅所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

创建拉取订阅需要以下权限:

  • 从订阅中拉取: pubsub.subscriptions.consume
  • 创建订阅: pubsub.subscriptions.create
  • 删除订阅: pubsub.subscriptions.delete
  • 获取订阅: pubsub.subscriptions.get
  • 列出订阅: pubsub.subscriptions.list
  • 更新订阅: pubsub.subscriptions.update
  • 将订阅附加到主题: pubsub.topics.attachSubscription
  • 获取订阅的 IAM 政策: pubsub.subscriptions.getIamPolicy
  • 为订阅配置 IAM 政策 pubsub.subscriptions.setIamPolicy

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如果您需要在一个项目中创建与另一个项目中的主题关联的拉取订阅,请让主题管理员为您授予该主题的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。

拉取订阅属性

配置拉取订阅时,您可以指定以下属性。

通用属性

了解您可以在所有订阅中设置的常见订阅属性

仅传送一次

仅传送一次。如果设置了此属性,Pub/Sub 会实现正好一次传送保证。如果未指定,则订阅支持每条消息的至少一次传送。

创建拉取订阅

以下示例演示了如何使用提供的默认设置创建使用拉取传送的订阅。

控制台

如需创建拉取订阅,请完成以下步骤。

  1. 在 Google Cloud 控制台中,前往订阅页面。

    前往“订阅”页面

  2. 点击创建订阅
  3. 订阅 ID 字段中,输入名称。

    如需了解如何命名订阅,请参阅主题或订阅命名指南

  4. 从下拉菜单中选择或创建一个主题。订阅将接收来自该主题的消息。
  5. 传送类型保留为拉取
  6. 保留所有其他默认值。
  7. 点击创建

您还可以通过主题部分创建订阅。 此快捷方式可帮助您将主题与订阅关联。

  1. 在 Google Cloud 控制台中,前往主题页面。

    转到“主题”

  2. 点击要创建订阅的主题旁边的
  3. 从上下文菜单中,选择创建订阅
  4. 输入订阅 ID

    如需了解如何命名订阅,请参阅主题或订阅命名指南

  5. 传送类型保留为拉取
  6. 保留所有其他默认值。
  7. 点击创建

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 如需创建拉取订阅,请运行 gcloud pubsub subscriptions create 命令
    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_ID

    替换以下内容:

    • SUBSCRIPTION_ID:新拉取订阅的名称或 ID。
    • TOPIC_ID:主题的名称或 ID。

REST

如需创建拉取订阅,请使用 projects.subscriptions.create 方法:

请求:

必须使用 Authorization 标头中的访问令牌对请求进行身份验证。如需获取当前应用默认凭据的访问令牌,请运行以下命令: gcloud auth application-default print-access-token

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer ACCESS_TOKEN

请求正文:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}

其中:

  • PROJECT_ID 是项目 ID。
  • SUBSCRIPTION_ID 是您的订阅 ID。
  • TOPIC_ID 是主题 ID。
  • 回答:

    {
      "name": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID",
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
      "pushConfig": {},
      "ackDeadlineSeconds": 10,
      "messageRetentionDuration": "604800s",
      "expirationPolicy": {
        "ttl": "2678400s"
      }
    }

    C++

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C++ API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

    namespace pubsub_admin = ::google::cloud::pubsub_admin;
    namespace pubsub = ::google::cloud::pubsub;
    [](pubsub_admin::SubscriptionAdminClient client,
       std::string const& project_id, std::string const& topic_id,
       std::string const& subscription_id) {
      google::pubsub::v1::Subscription request;
      request.set_name(
          pubsub::Subscription(project_id, subscription_id).FullName());
      request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
      auto sub = client.CreateSubscription(request);
      if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
        std::cout << "The subscription already exists\n";
        return;
      }
      if (!sub) throw std::move(sub).status();
    
      std::cout << "The subscription was successfully created: "
                << sub->DebugString() << "\n";
    }

    C#

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

    
    using Google.Cloud.PubSub.V1;
    using Grpc.Core;
    
    public class CreateSubscriptionSample
    {
        public Subscription CreateSubscription(string projectId, string topicId, string subscriptionId)
        {
            SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
            TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
    
            SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
            Subscription subscription = null;
    
            try
            {
                subscription = subscriber.CreateSubscription(subscriptionName, topicName, pushConfig: null, ackDeadlineSeconds: 60);
            }
            catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
            {
                // Already exists.  That's fine.
            }
            return subscription;
        }
    }

    Go

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

    import (
    	"context"
    	"fmt"
    	"io"
    	"time"
    
    	"cloud.google.com/go/pubsub"
    )
    
    func create(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
    	// projectID := "my-project-id"
    	// subID := "my-sub"
    	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	defer client.Close()
    
    	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
    		Topic:       topic,
    		AckDeadline: 20 * time.Second,
    	})
    	if err != nil {
    		return fmt.Errorf("CreateSubscription: %w", err)
    	}
    	fmt.Fprintf(w, "Created subscription: %v\n", sub)
    	return nil
    }
    

    Java

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

    
    import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
    import com.google.pubsub.v1.PushConfig;
    import com.google.pubsub.v1.Subscription;
    import com.google.pubsub.v1.SubscriptionName;
    import com.google.pubsub.v1.TopicName;
    import java.io.IOException;
    
    public class CreatePullSubscriptionExample {
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String subscriptionId = "your-subscription-id";
        String topicId = "your-topic-id";
    
        createPullSubscriptionExample(projectId, subscriptionId, topicId);
      }
    
      public static void createPullSubscriptionExample(
          String projectId, String subscriptionId, String topicId) throws IOException {
        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
          TopicName topicName = TopicName.of(projectId, topicId);
          SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);
          // Create a pull subscription with default acknowledgement deadline of 10 seconds.
          // Messages not successfully acknowledged within 10 seconds will get resent by the server.
          Subscription subscription =
              subscriptionAdminClient.createSubscription(
                  subscriptionName, topicName, PushConfig.getDefaultInstance(), 10);
          System.out.println("Created pull subscription: " + subscription.getName());
        }
      }
    }

    Node.js

    /**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    // const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function createSubscription(topicNameOrId, subscriptionNameOrId) {
      // Creates a new subscription
      await pubSubClient
        .topic(topicNameOrId)
        .createSubscription(subscriptionNameOrId);
      console.log(`Subscription ${subscriptionNameOrId} created.`);
    }

    Node.js

    /**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    // const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    import {PubSub} from '@google-cloud/pubsub';
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function createSubscription(
      topicNameOrId: string,
      subscriptionNameOrId: string
    ) {
      // Creates a new subscription
      await pubSubClient
        .topic(topicNameOrId)
        .createSubscription(subscriptionNameOrId);
      console.log(`Subscription ${subscriptionNameOrId} created.`);
    }

    PHP

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 PHP 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub PHP API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

    use Google\Cloud\PubSub\PubSubClient;
    
    /**
     * Creates a Pub/Sub subscription.
     *
     * @param string $projectId  The Google project ID.
     * @param string $topicName  The Pub/Sub topic name.
     * @param string $subscriptionName  The Pub/Sub subscription name.
     */
    function create_subscription($projectId, $topicName, $subscriptionName)
    {
        $pubsub = new PubSubClient([
            'projectId' => $projectId,
        ]);
        $topic = $pubsub->topic($topicName);
        $subscription = $topic->subscription($subscriptionName);
        $subscription->create();
    
        printf('Subscription created: %s' . PHP_EOL, $subscription->name());
    }

    Python

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

    from google.cloud import pubsub_v1
    
    # TODO(developer)
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    # subscription_id = "your-subscription-id"
    
    publisher = pubsub_v1.PublisherClient()
    subscriber = pubsub_v1.SubscriberClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    subscription_path = subscriber.subscription_path(project_id, subscription_id)
    
    # Wrap the subscriber in a 'with' block to automatically call close() to
    # close the underlying gRPC channel when done.
    with subscriber:
        subscription = subscriber.create_subscription(
            request={"name": subscription_path, "topic": topic_path}
        )
    
    print(f"Subscription created: {subscription}")

    Ruby

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

    # topic_id        = "your-topic-id"
    # subscription_id = "your-subscription-id"
    
    pubsub = Google::Cloud::Pubsub.new
    
    topic        = pubsub.topic topic_id
    subscription = topic.subscribe subscription_id
    
    puts "Pull subscription #{subscription_id} created."

    监控拉取订阅

    Cloud Monitoring 提供了一些指标来监控订阅

    如需查看与 Pub/Sub 相关的所有可用指标及其说明的列表,请参阅 Pub/Sub 监控文档

    您还可以在 Pub/Sub 中监控订阅。

    后续步骤