创建 Confluent Cloud 导入主题

借助 Confluent Cloud 导入主题,您可以将数据作为外部来源持续提取到 Confluent Cloud 并导入到 Pub/Sub。然后,您可以将数据流式传输到 Pub/Sub 支持的任何目的地。

本文档介绍了如何创建和管理 Confluent Cloud 导入主题。如需创建标准主题,请参阅创建标准主题

如需详细了解导入主题,请参阅导入主题简介

准备工作

所需的角色和权限

如需获得创建和管理 Confluent Cloud 导入主题所需的权限,请让您的管理员为您授予主题或项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建和管理 Confluent Cloud 导入主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需创建和管理 Confluent Cloud 导入主题,需要具备以下权限:

  • 创建导入主题: pubsub.topics.create
  • 删除导入主题: pubsub.topics.delete
  • 获取导入主题: pubsub.topics.get
  • 列出导入主题: pubsub.topics.list
  • 发布到导入主题: pubsub.topics.publish
  • 更新导入主题: pubsub.topics.update
  • 获取导入主题的 IAM 政策: pubsub.topics.getIamPolicy
  • 为导入主题配置 IAM 政策 pubsub.topics.setIamPolicy

您也可以使用自定义角色或其他预定义角色来获取这些权限。

您可以在项目级别和个别资源级别配置访问权限控制。

设置联合身份以访问 Confluent Cloud

借助工作负载身份联合, Google Cloud 服务可以访问 Google Cloud外部运行的工作负载。借助身份联合,您无需维护或传递凭据,即可 Google Cloud 访问其他云中的资源。不过,您可以使用工作负载本身的身份进行身份验证 Google Cloud 并访问资源。

在 Google Cloud中创建服务账号

这是一个可选步骤。 如果您已有服务账号,则可以在本过程中使用该账号,而无需创建新的服务账号。如果您使用的是现有服务账号,请参阅记录服务账号唯一 ID 以了解后续步骤。

对于 Confluent Cloud 导入主题,Pub/Sub 会使用服务账号作为身份来访问 Confluent Cloud 中的资源。

如需详细了解如何创建服务账号(包括前提条件、所需角色和权限以及命名准则),请参阅创建服务账号。创建服务账号后,您可能需要等待 60 秒或更长时间才能使用该服务账号。出现这种行为的原因是读取操作是最终一致的;新服务账号可能需要一段时间才能显示。

记录服务账号唯一 ID

您需要服务账号唯一 ID 才能在 Confluent Cloud 控制台中设置身份提供方和身份池。

  1. 在 Google Cloud 控制台中,前往服务账号详情页面。

    前往服务账号

  2. 点击您刚刚创建的服务账号或您打算使用的服务账号。

  3. 服务账号详情页面中,记录唯一 ID 编号。

    您需要在工作流中使用该 ID 来在 Confluent Cloud 控制台中设置身份提供方和身份池

向 Pub/Sub 服务账号添加 Service Account Token Creator 角色

Service Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator) 可让主账号为服务账号创建短期有效凭据。这些令牌或凭据用于模拟服务账号。

如需详细了解服务账号模拟,请参阅服务账号模拟

您还可以在此过程中添加 Pub/Sub Publisher 角色 (roles/pubsub.publisher)。如需详细了解该角色以及添加该角色的原因,请参阅向 Pub/Sub 服务账号添加 Pub/Sub 发布商角色

  1. 在 Google Cloud 控制台中,转到 IAM 页面。

    转到 IAM

  2. 点击包括 Google提供的角色授权复选框。

  3. 查找格式为 service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 的服务账号。

  4. 对于此服务账号,点击修改主账号按钮。

  5. 根据需要,点击添加其他角色

  6. 搜索并点击 Service account token creator 角色 (roles/iam.serviceAccountTokenCreator)。

  7. 点击保存

在 Confluent Cloud 中创建身份提供方

如需向 Confluent Cloud 进行身份验证,Google Cloud 服务账号需要一个身份池。您必须先在 Confluent Cloud 中创建身份提供方。

如需详细了解如何在 Confluent Cloud 中创建身份提供程序,请访问添加 OAuth/OIDC 身份提供程序页面。

  1. 登录 Confluent Cloud 控制台

  2. 在菜单中,点击账号和访问权限

  3. 点击 Workload Identity

  4. 点击添加提供商

  5. 点击 OAuth/OIDC,然后点击 Next(下一步)。

  6. 点击其他 OIDC 提供程序,然后点击下一步

  7. 提供身份提供程序的名称和用途说明。

  8. 点击显示高级配置

  9. 签发者 URI 字段中,输入 https://accounts.google.com

  10. JWKS URI 字段中,输入 https://www.googleapis.com/oauth2/v3/certs

  11. 点击验证并保存

在 Confluent Cloud 中创建身份池并授予适当的角色

您必须在身份资料下创建身份池,并授予必要的角色,以允许 Pub/Sub 服务账号进行身份验证并从 Confluent Cloud Kafka 主题读取数据。

请确保您的集群已在 Confluent Cloud 中创建,然后再继续创建身份池。

如需详细了解如何创建身份池,请访问将身份池与 OAuth/OIDC 身份提供程序搭配使用页面。

  1. 登录 Confluent Cloud 控制台

  2. 在菜单中,点击账号和访问权限

  3. 点击 Workload Identity

  4. 点击您在在 Confluent Cloud 中创建身份提供方中创建的身份提供方。

  5. 点击添加池

  6. 为您的身份池提供名称和说明。

  7. 身份声明设置为 claims

  8. 设置过滤条件下,点击高级标签页。输入以下代码:

    claims.iss=='https://accounts.google.com' && claims.sub=='<SERVICE_ACCOUNT_UNIQUE_ID>'
    

    <SERVICE_ACCOUNT_UNIQUE_ID> 替换为记录服务账号唯一 ID 中所述的服务账号唯一 ID。

  9. 点击下一步

  10. 点击添加新权限。然后,点击下一步

  11. 在相关集群中,点击添加角色分配

  12. 点击操作员角色,然后点击添加

    此角色可授予 Pub/Sub 权限。服务账号对包含要提取到 Pub/Sub 的 Confluent Kafka 主题的集群的访问权限。

  13. 在集群下,点击主题。然后,点击添加角色分配

  14. 选择 DeveloperRead 角色。

  15. 点击相应选项,然后指定主题或前缀。例如,特定主题前缀规则所有主题

  16. 点击添加

  17. 点击下一步

  18. 点击验证并保存

将 Pub/Sub 发布商角色添加到 Pub/Sub 正文

如需启用发布功能,您必须向 Pub/Sub 服务账号分配发布商角色,以便 Pub/Sub 能够向 Confluent Cloud 导入主题发布内容。

启用从所有主题发布内容

如果您尚未创建任何 Confluent Cloud 导入主题,请使用此方法。

  1. 在 Google Cloud 控制台中,转到 IAM 页面。

    转到 IAM

  2. 点击包括 Google提供的角色授权复选框。

  3. 查找格式为 service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 的服务账号。

  4. 对于此服务账号,点击修改主账号按钮。

  5. 根据需要,点击添加其他角色

  6. 搜索并点击 Pub/Sub 发布商角色 (roles/pubsub.publisher)。

  7. 点击保存

启用从单个主题发布

仅当 Confluent Cloud 导入主题已存在时,才应使用此方法。

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud pubsub topics add-iam-policy-binding 命令:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID \
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \
       --role="roles/pubsub.publisher"

    替换以下内容:

    • TOPIC_ID:Confluent Cloud 导入主题的主题 ID。

    • PROJECT_NUMBER:项目编号。如需查看项目编号,请参阅标识项目

向服务账号添加服务账号用户角色

Service Account User 角色 (roles/iam.serviceAccountUser) 包含 iam.serviceAccounts.actAs 权限,可让主账号将服务账号附加到 Confluent Cloud 导入主题的提取设置,并使用该服务账号进行联合身份验证。

  1. 在 Google Cloud 控制台中,转到 IAM 页面。

    转到 IAM

  2. 对于发出创建或更新主题调用的主账号,请点击修改主账号按钮。

  3. 根据需要,点击添加其他角色

  4. 搜索并点击 Service Account User 角色 (roles/iam.serviceAccountUser)。

  5. 点击保存

使用 Confluent Cloud 导入主题

您可以创建新的导入主题,也可以修改现有主题。

注意事项

  • 分别创建主题和订阅(即使是快速连续创建)可能会导致数据丢失。在订阅之前,主题会存在一小段时间。如果在此期间向主题发送了任何数据,这些数据将丢失。通过先创建主题、创建订阅,然后将主题转换为导入主题,您可以确保在导入过程中不会遗漏任何消息。

  • 如果您需要使用相同名称重新创建现有导入主题的 Kafka 主题,则不能删除 Kafka 主题并重新创建。此操作可能会使 Pub/Sub 的偏移量管理失效,从而导致数据丢失。如需缓解此问题,请按以下步骤操作:

    • 删除 Pub/Sub 导入主题。
    • 删除 Kafka 主题。
    • 创建 Kafka 主题。
    • 创建 Pub/Sub 导入主题。
  • 系统始终从 Confluent Cloud Kafka 主题的最早偏移量读取数据。

创建 Confluent Cloud 导入主题

如需详细了解与主题关联的属性,请参阅主题的属性

确保您已完成以下步骤:

如需创建 Confluent Cloud 导入主题,请按以下步骤操作:

控制台

  1. 在 Google Cloud 控制台中,前往主题页面。

    转到“主题”

  2. 点击创建主题

  3. 主题 ID 字段中,输入要导入的主题的 ID。

    如需详细了解命名主题,请参阅命名准则

  4. 选择添加默认订阅

  5. 选择启用提取

  6. 对于提取来源,选择 Confluent Cloud

  7. 输入以下详细信息:

    1. 引导服务器:集群的引导服务器,其中包含要提取到 Pub/Sub 的 Kafka 主题。格式如下:hostname:port

    2. 集群 ID:包含要提取到 Pub/Sub 的 Kafka 主题的集群的 ID。

    3. Topic:要提取到 Pub/Sub 的 Kafka 主题的名称。

    4. 身份池 ID:用于通过 Confluent Cloud 进行身份验证的身份池的池 ID。

    5. 服务账号:您在在 Google Cloud 中创建服务账号中创建的服务账号。

  8. 点击创建主题

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud pubsub topics create 命令:

    gcloud pubsub topics create TOPIC_ID \
        --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \
        --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \
        --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \
        --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \
        --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    替换以下内容:

    • TOPIC_ID:您的 Pub/Sub 主题的名称或 ID。
    • CONFLUENT_BOOTSTRAP_SERVER:集群的引导服务器,其中包含要提取到 Pub/Sub 的 Kafka 主题。格式如下:hostname:port
    • CONFLUENT_CLUSTER_ID:包含要提取到 Pub/Sub 的 Kafka 主题的集群的 ID。
    • CONFLUENT_TOPIC:要提取到 Pub/Sub 的 Kafka 主题的名称。
    • CONFLUENT_IDENTITY_POOL_ID:用于通过 Confluent Cloud 进行身份验证的身份池的池 ID。
    • PUBSUB_SERVICE_ACCOUNT:您在在 Google Cloud 中创建服务账号中创建的服务账号。

Go

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithConfluentCloudIngestion(w io.Writer, projectID, topicID, bootstrapServer, clusterID, confluentTopic, poolID, gcpSA string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"

	// // Confluent Cloud ingestion settings.
	// bootstrapServer := "bootstrap-server"
	// clusterID := "cluster-id"
	// confluentTopic := "confluent-topic"
	// poolID := "identity-pool-id"
	// gcpSA := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceConfluentCloud{
				BootstrapServer:   bootstrapServer,
				ClusterID:         clusterID,
				Topic:             confluentTopic,
				IdentityPoolID:    poolID,
				GCPServiceAccount: gcpSA,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Created topic with Confluent Cloud ingestion: %v\n", t)
	return nil
}

Java

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithConfluentCloudIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Confluent Cloud ingestion settings.
    String bootstrapServer = "bootstrap-server";
    String clusterId = "cluster-id";
    String confluentTopic = "confluent-topic";
    String identityPoolId = "identity-pool-id";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithConfluentCloudIngestionExample(
        projectId,
        topicId,
        bootstrapServer,
        clusterId,
        confluentTopic,
        identityPoolId,
        gcpServiceAccount);
  }

  public static void createTopicWithConfluentCloudIngestionExample(
      String projectId,
      String topicId,
      String bootstrapServer,
      String clusterId,
      String confluentTopic,
      String identityPoolId,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.ConfluentCloud confluentCloud =
          IngestionDataSourceSettings.ConfluentCloud.newBuilder()
              .setBootstrapServer(bootstrapServer)
              .setClusterId(clusterId)
              .setTopic(confluentTopic)
              .setIdentityPoolId(identityPoolId)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setConfluentCloud(confluentCloud).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Confluent Cloud ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Node.js 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bootstrapServer = 'url:port';
// const clusterId = 'YOUR_CLUSTER_ID';
// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
// const identityPoolId = 'pool-ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithConfluentCloudIngestion(
  topicNameOrId,
  bootstrapServer,
  clusterId,
  confluentTopic,
  identityPoolId,
  gcpServiceAccount
) {
  // Creates a new topic with Confluent Cloud ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      confluentCloud: {
        bootstrapServer,
        clusterId,
        topic: confluentTopic,
        identityPoolId,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
}

Python

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bootstrap_server = "your-bootstrap-server"
# cluster_id = "your-cluster-id"
# confluent_topic = "your-confluent-topic"
# identity_pool_id = "your-identity-pool-id"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        confluent_cloud=IngestionDataSourceSettings.ConfluentCloud(
            bootstrap_server=bootstrap_server,
            cluster_id=cluster_id,
            topic=confluent_topic,
            identity_pool_id=identity_pool_id,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Confluent Cloud Ingestion Settings")

C++

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C++ API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string const& bootstrap_server,
   std::string const& cluster_id, std::string const& confluent_topic,
   std::string const& identity_pool_id,
   std::string const& gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* confluent_cloud = request.mutable_ingestion_data_source_settings()
                              ->mutable_confluent_cloud();
  confluent_cloud->set_bootstrap_server(bootstrap_server);
  confluent_cloud->set_cluster_id(cluster_id);
  confluent_cloud->set_topic(confluent_topic);
  confluent_cloud->set_identity_pool_id(identity_pool_id);
  confluent_cloud->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 Node.js 设置说明执行操作。 如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bootstrapServer = 'url:port';
// const clusterId = 'YOUR_CLUSTER_ID';
// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
// const identityPoolId = 'pool-ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithConfluentCloudIngestion(
  topicNameOrId: string,
  bootstrapServer: string,
  clusterId: string,
  confluentTopic: string,
  identityPoolId: string,
  gcpServiceAccount: string
) {
  // Creates a new topic with Confluent Cloud ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      confluentCloud: {
        bootstrapServer,
        clusterId,
        topic: confluentTopic,
        identityPoolId,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
}

如果您遇到问题,请参阅“排查 Confluent Cloud 导入问题”主题

修改 Confluent Cloud Hubs 导入主题

如需修改 Confluent Cloud 导入主题的提取数据源设置,请按以下步骤操作:

控制台

  1. 在 Google Cloud 控制台中,前往主题页面。

    转到“主题”

  2. 点击“Confluent Cloud 导入”主题。

  3. 在主题详情页面中,点击修改

  4. 更新您要更改的字段。

  5. 点击更新

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    为避免丢失导入主题的设置,请务必在每次更新主题时添加所有设置。如果您遗漏了某些内容,Pub/Sub 会将设置重置为原始默认值。

  2. 使用以下示例中提及的所有标志运行 gcloud pubsub topics update 命令:

    gcloud pubsub topics update TOPIC_ID \
       --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \
       --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \
       --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \
       --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \
       --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    替换以下内容:

    • TOPIC_ID:您的 Pub/Sub 主题的名称或 ID。
    • CONFLUENT_BOOTSTRAP_SERVER:集群的引导服务器,其中包含要提取到 Pub/Sub 的 Kafka 主题。格式如下:hostname:port
    • CONFLUENT_CLUSTER_ID:包含要提取到 Pub/Sub 的 Kafka 主题的集群的 ID
    • CONFLUENT_TOPIC:要提取到 Pub/Sub 的 Kafka 主题的名称。
    • CONFLUENT_IDENTITY_POOL_ID:用于通过 Confluent Cloud 进行身份验证的身份池的池 ID。
    • CONFLUENT_IDENTITY_POOL_ID:您在在 Google Cloud 中创建服务账号中创建的服务账号。

配额和限制

导入主题的发布者吞吐量受主题的发布配额的约束。如需了解详情,请参阅 Pub/Sub 配额和限制

后续步骤

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。