创建导入主题

通过导入主题,您可以将外部来源中的数据注入 Pub/Sub。然后,您可以将数据流式传输到 Pub/Sub 支持的任何目标位置。

Pub/Sub 支持使用 Amazon Kinesis Data Streams 作为外部来源,将数据注入到导入主题中。

导入主题概览

导入主题已作为媒体资源为该主题启用了提取功能。这样,导入主题就可以注入流式数据。您可以使用控制台、Google Cloud CLI、REST 调用或客户端库为主题启用提取功能。作为管理导入主题的一部分,Google Cloud 会提供对提取流水线的监控和伸缩。

如果没有导入主题,则从数据源将数据流式传输到 Pub/Sub 需要额外的服务。这项附加服务从原始来源拉取数据并将其发布到 Pub/Sub。附加服务可以是流式引擎(例如 Apache Spark)或自定义编写的服务。您还必须配置、部署、运行、扩缩和监控此服务。

下面列出了有关导入主题的重要信息:

  • 与标准主题类似,您仍然可以手动发布到导入主题。

  • 一个导入主题只能附加一个提取来源。

我们建议您针对流式数据导入主题。如果您正在考虑将数据批量注入 BigQuery 而不是流式数据注入,则可以尝试使用 BigQuery Data Transfer Service (BQ DTS)。如果您要将数据注入 Cloud Storage,Storage Transfer Service (STS) 是一个不错的选择。

准备工作

管理导入主题所需的角色和权限

如需获取创建和管理导入主题所需的权限,请让管理员授予您针对主题或项目的 Pub/Sub Editor(roles/pubsub.editor) IAM 角色。如需详细了解如何授予角色,请参阅管理访问权限

此预定义角色包含创建和管理导入主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需创建和管理导入主题,您需要拥有以下权限:

  • 创建导入主题: pubsub.topics.create
  • 删除导入主题: pubsub.topics.delete
  • 获取导入主题: pubsub.topics.get
  • 列出导入主题: pubsub.topics.list
  • 发布到导入主题: pubsub.topics.publish
  • 更新导入主题: pubsub.topics.update
  • 获取导入主题的 IAM 政策: pubsub.topics.getIamPolicy
  • 为导入主题配置 IAM 政策 pubsub.topics.setIamPolicy

您也可以使用自定义角色或其他预定义角色来获取这些权限。

您可以在项目级和单个资源级配置访问权限控制。

设置联合身份以访问 Kinesis Data Streams

借助 Workload Identity Federation,Google Cloud 服务可以访问在 Google Cloud 外部运行的工作负载。借助身份联合,您无需维护凭据或将凭据传递给 Google Cloud,即可访问其他云中的资源。相反,您可以使用工作负载本身的身份向 Google Cloud 进行身份验证并访问资源。

在 Google Cloud 中创建服务帐号

这是一个可选步骤。 如果您已有服务帐号,则可以在此过程中使用该帐号,而无需创建新的服务帐号。如果您使用的是现有服务帐号,请转到记录服务帐号唯一 ID 以完成下一步。

对于导入主题,Pub/Sub 会使用服务帐号作为身份从 AWS 访问资源。

如需详细了解如何创建服务帐号,包括前提条件、所需的角色和权限以及命名准则,请参阅创建服务帐号。创建服务帐号后,您可能需要等待 60 秒或更长时间才能使用该服务帐号。出现此行为的原因是读取操作最终保持一致;新的服务帐号可能需要一段时间才能变得可见。

记录服务帐号唯一 ID

您需要有服务帐号唯一 ID 才能在 AWS 控制台中设置角色。

  1. 在 Google Cloud 控制台中,前往服务账号详情页面。

    转到服务帐号

  2. 点击您刚刚创建的服务帐号或您打算使用的服务帐号。

  3. 服务帐号详情页面,记录唯一 ID 编号。

    使用自定义信任政策在 AWS 中创建角色部分,您将需要该 ID。

将服务帐号 Token Creator 角色添加到 Pub/Sub 服务帐号

Service account Token Creator 角色 (roles/iam.serviceAccountTokenCreator) 可让主帐号为服务帐号创建短期有效凭据。这些令牌或凭据用于模拟服务帐号。

如需详细了解服务账号模拟,请参阅服务账号模拟

您还可以在此过程中添加 Pub/Sub Publisher 角色 (roles/pubsub.publisher)。如需详细了解该角色以及添加它的原因,请参阅将 Pub/Sub Publisher 角色添加到 Pub/Sub 服务帐号

  1. 在 Google Cloud 控制台中,转到 IAM 页面。

    转到 IAM

  2. 启用包括 Google 提供的角色授权选项。

  3. 查找格式为 service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 的服务帐号。

  4. 对于此服务帐号,请点击修改主账号按钮。

  5. 如有需要,请点击添加其他角色

  6. 搜索并选择 Service account Token Creator 角色 (roles/iam.serviceAccountTokenCreator)。

  7. 点击保存

在 AWS 中创建政策

您需要在 AWS 中设置政策,以允许 Pub/Sub 进行 AWS 身份验证,使 Pub/Sub 能够从 AWS Kinesis 数据流中注入数据。在创建 AWS 政策之前,请创建一个 Kinesis 数据流并在该数据流上注册使用方。我们建议您这样做,以便您可以限制对特定数据流的权限。

  • 如需详细了解如何创建 AWS Kinesis 数据流,请参阅 Kinesis 数据流

  • 如需详细了解用于注册使用方的 AWS Kinesis 数据流 API,请参阅 RegisterStreamConsumer

  • 如需详细了解如何在 AWS 中创建政策以及查看相关信息,请参阅创建 IAM 政策

如需在 AWS 中创建政策,请执行以下步骤:

  1. 登录 AWS Management Console 并打开 IAM 控制台

  2. IAM 的控制台导航窗格中,点击访问权限管理 > 政策

  3. 点击创建政策

  4. 选择服务部分,选择 Kinesis

  5. 对于允许的操作,请选择以下选项:

    • List > ListShards

      此操作会授予列出数据流中的分片的权限,并提供每个分片的相关信息。

    • 读取 > SubscribeToShard

      此操作可授予通过增强扇出功能监听特定分片的权限。

    • Read > DescribeStreamConsumer

      此操作会授予获取已注册数据流使用方的说明的权限。

    这些权限适用于从数据流读取数据。Pub/Sub 仅支持使用流式 SubscribeToShard API 从采用增强型扇出的 Kinesis 流读取数据。

  6. 对于资源,如果要将政策限制为仅适用于特定数据流或使用方(推荐),请指定使用方 ARN数据流 ARN

  7. 点击添加更多权限

  8. 选择服务部分,输入并选择 STS

  9. 对于允许的操作,依次选择写入 > AssumeRoleWithWebIdentity

    此操作会授予获取一组临时安全凭据的权限,以便 Pub/Sub 使用身份联合功能对 Kinesis 数据流进行身份验证。

  10. 点击下一步

  11. 输入政策名称和说明。

  12. 点击创建政策

使用自定义信任政策在 AWS 中创建角色

您必须在 AWS 中创建角色,以便 Pub/Sub 能够向 AWS 进行身份验证,以便从 Kinesis Data Streams 中注入数据。

如需使用自定义信任政策创建角色,请执行以下步骤:

  1. 登录 AWS 管理控制台并打开 IAM 控制台

  2. IAM 的控制台导航窗格中,点击角色

  3. 点击创建角色

  4. 选择可信实体部分,选择自定义信任政策

  5. 自定义信任政策部分,输入或粘贴以下内容:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
         "Effect": "Allow",
         "Principal": {
            "Federated": "accounts.google.com"
         },
         "Action": "sts:AssumeRoleWithWebIdentity",
         "Condition": {
             "StringEquals": {
               "accounts.google.com:sub": "<SERVICE_ACCOUNT_UNIQUE_ID>"
             }
          }
        }
      ]
    }
    

    <SERVICE_ACCOUNT_UNIQUE_ID> 替换为您在记录服务帐号唯一 ID 中记录的服务帐号的唯一 ID。

  6. 点击下一步

  7. 添加权限部分,搜索并选择您刚刚创建的自定义政策。

  8. 点击下一步

  9. 输入角色名称和说明。

  10. 点击创建角色

将 Pub/Sub Publisher 角色添加到 Pub/Sub 服务帐号

您必须为 Pub/Sub 服务帐号分配发布者角色,以便 Pub/Sub 能够从 AWS Kinesis Data Streams 发布到导入主题。

允许从所有主题发布

  1. 在 Google Cloud 控制台中,转到 IAM 页面。

    转到 IAM

  2. 启用包括 Google 提供的角色授权选项。

  3. 查找格式为 service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 的服务帐号。

  4. 对于此服务帐号,请点击修改主账号按钮。

  5. 如有需要,请点击添加其他角色

  6. 搜索并选择 Pub/Sub publisher 角色 (roles/pubsub.publisher)。

  7. 点击保存

允许从单个主题发布

如果您只想向特定导入主题授予发布权限,请按以下步骤操作:

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud pubsub topics add-iam-policy-binding 命令:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"
    

    替换以下内容:

    • TOPIC_ID 是导入主题的主题 ID。

    • PROJECT_NUMBER 是项目编号。如需查看项目编号,请参阅识别项目

将服务帐号用户角色添加到服务帐号

Service Account User 角色 (roles/iam.serviceAccountUser) 包含 iam.serviceAccounts.actAs 权限,可允许主帐号将一个服务帐号附加到导入主题的提取设置,并将该服务帐号用于联合身份。

执行以下步骤:

  1. 在 Google Cloud 控制台中,转到 IAM 页面。

    转到 IAM

  2. 对于要发出创建或更新主题调用的主账号,点击修改主账号按钮。

  3. 如有需要,请点击添加其他角色

  4. 搜索并选择 Service Account User 角色 (roles/iam.serviceAccountUser)。

  5. 点击保存

创建导入主题

如需详细了解与主题关联的属性,请参阅主题的属性

确保您已完成以下步骤:

如需创建导入主题,请按以下步骤操作:

控制台

  1. 在 Google Cloud 控制台中,前往主题页面。

    转到“主题”

  2. 点击创建主题

  3. 主题 ID 字段中,输入导入主题的 ID。

    如需详细了解如何为主题命名,请参阅命名准则

  4. 选择添加默认订阅

  5. 选择启用提取

  6. 对于注入来源,请选择 Amazon Kinesis Data Streams

  7. 输入以下详细信息:

    • Kinesis 数据流 ARN:您计划注入到 Pub/Sub 中的 Kinesis 数据流的 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • Kinesis 使用方 ARN:已注册到 AWS Kinesis Data Stream 的使用方资源的 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • AWS 角色 ARN:AWS 角色的 ARN。角色的 ARN 格式如下所示:arn:aws:iam:${Account}:role/${RoleName}

    • 服务帐号:您在在 Google Cloud 中创建服务帐号中创建的服务帐号。

  8. 点击创建主题

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud pubsub topics create 命令:

    gcloud pubsub topics create TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    替换以下内容:

    • TOPIC_ID 是主题 ID。

    • KINESIS_STREAM_ARN 是您计划注入到 Pub/Sub 的 Kinesis 数据流的 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • KINESIS_CONSUMER_ARN 是注册到 AWS Kinesis Data Streams 的使用方资源的 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • KINESIS_ROLE_ARN 是 AWS 角色的 ARN。该角色的 ARN 格式如下所示:arn:aws:iam:${Account}:role/${RoleName}

    • PUBSUB_SERVICE_ACCOUNT 是您在在 Google Cloud 中创建服务帐号中创建的服务帐号。

Go

试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithKinesisIngestion(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Kinesis topic created: %v\n", t)
	return nil
}

Java

试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Java API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithKinesisIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithKinesisIngestionExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void createTopicWithKinesisIngestionExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println("Created topic with Kinesis ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn
) {
  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

Python

试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Python API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
            stream_arn=stream_arn,
            consumer_arn=consumer_arn,
            aws_role_arn=aws_role_arn,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis =
      request.mutable_ingestion_data_source_settings()->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

在尝试此示例之前,请按照“Pub/Sub 快速入门:使用客户端库”中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string
) {
  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

如需详细了解 ARN,请参阅 Amazon 资源名称 (ARN)IAM 标识符

如果您遇到问题,请参阅导入主题问题排查

修改导入主题

您可以修改导入主题的注入数据源设置。执行以下步骤:

控制台

  1. 在 Google Cloud 控制台中,前往主题页面。

    转到“主题”

  2. 点击导入主题。

  3. 在主题详情页面中,点击修改

  4. 更新您要更改的字段。

  5. 点击更新

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud pubsub topics update 命令,其中包含以下示例中提及的所有标志:

      gcloud pubsub topics update TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    替换以下内容:

    • TOPIC_ID 是主题 ID。此字段无法更新。

    • KINESIS_STREAM_ARN 是您计划注入到 Pub/Sub 的 Kinesis 数据流的 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • KINESIS_CONSUMER_ARN 是注册到 AWS Kinesis Data Streams 的使用方资源的 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • KINESIS_ROLE_ARN 是 AWS 角色的 ARN。该角色的 ARN 格式如下:arn:aws:iam:${Account}:role/${RoleName}

    • PUBSUB_SERVICE_ACCOUNT 是您在在 Google Cloud 中创建服务帐号中创建的服务帐号。

导入主题的配额和限制

导入主题的发布者吞吐量受主题的发布配额约束。如需了解详情,请参阅 Pub/Sub 配额和限制

后续步骤