借助 Confluent Cloud 导入主题,您可以持续从 Confluent Cloud 注入数据作为外部来源,并将其注入到 Pub/Sub 中。然后,您可以将数据流式传输到 Pub/Sub 支持的任何目标位置。
本文档介绍了如何创建和管理 Confluent Cloud import 主题。如需创建标准主题,请参阅创建标准主题。
如需详细了解导入主题,请参阅关于导入主题。
准备工作
- 详细了解 Pub/Sub 发布流程。 
- 配置管理 Confluent Cloud 导入主题所需的角色和权限,包括以下内容: 
- 设置工作负载身份联合,以便Google Cloud 可以访问外部流式传输服务。 
所需的角色和权限
    
      如需获得创建和管理 Confluent Cloud 导入主题所需的权限,请让您的管理员为您授予主题或项目的 Pub/Sub Editor  (roles/pubsub.editor) IAM 角色。
  
  
  
  
  如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
  
  
此预定义角色包含创建和管理 Confluent Cloud 导入主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:
所需权限
创建和管理 Confluent Cloud 导入主题需要以下权限:
- 
                创建导入主题:
                  pubsub.topics.create
- 
                删除导入主题:
                  pubsub.topics.delete
- 
                获取导入主题:
                  pubsub.topics.get
- 
                列出导入主题:
                  pubsub.topics.list
- 
                发布到导入主题:
                  pubsub.topics.publish and pubsub.serviceAgent
- 
                更新导入主题:
                  pubsub.topics.update
- 
                获取导入主题的 IAM 政策:
                  pubsub.topics.getIamPolicy
- 
                为导入主题配置 IAM 政策:
                  pubsub.topics.setIamPolicy
您可以在项目级层和个别资源级层配置访问权限控制。
设置联合身份以访问 Confluent Cloud
借助工作负载身份联合, Google Cloud 服务可以访问在 Google Cloud外部运行的工作负载。借助身份联合,您无需维护凭据或将凭据传递给 Google Cloud ,即可访问其他云中的资源。您可以改为使用工作负载本身的身份向 Google Cloud 进行身份验证并访问资源。
在 Google Cloud中创建服务账号
这是一个可选步骤。 如果您已有服务账号,则可以在此过程中使用该账号,而不必创建新的服务账号。 如果您使用的是现有服务账号,请前往记录服务账号唯一 ID 以进行下一步。
对于 Confluent Cloud 导入主题,Pub/Sub 使用服务账号作为身份来访问 Confluent Cloud 中的资源。
如需详细了解如何创建服务账号,包括前提条件、所需角色和权限以及命名指南,请参阅创建服务账号。创建服务账号后,您可能需要等待 60 秒或更长时间才能使用该服务账号。出现这种行为的原因是读取操作是最终一致的;新服务账号可能需要一段时间才能显示。
记录服务账号唯一 ID
您需要在 Confluent Cloud 控制台中设置身份提供方和身份池,为此您需要服务账号的唯一 ID。
- 在 Google Cloud 控制台中,前往服务账号详情页面。 
- 点击您刚刚创建的服务账号或计划使用的服务账号。 
- 在服务账号详情页面中,记录唯一 ID 编号。 - 您需要此 ID 作为工作流的一部分,以便在 Confluent Cloud 控制台中设置身份提供方和身份池。 
向 Pub/Sub 服务账号添加服务账号令牌创建者角色
借助 Service Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator),主账号可以为服务账号创建短期有效凭据。这些令牌或凭据用于模拟服务账号。
如需详细了解服务账号模拟,请参阅服务账号模拟。
您还可以在此过程中添加 Pub/Sub 发布者角色 (roles/pubsub.publisher)。如需详细了解该角色以及您添加该角色的原因,请参阅向 Pub/Sub 服务账号添加 Pub/Sub 发布者角色。
- 在 Google Cloud 控制台中,前往 IAM 页面。 
- 点击包括 Google提供的角色授权复选框。 
- 查找格式为 - service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com的服务账号。
- 对于此服务账号,点击修改主账号按钮。 
- 如果需要,请点击添加其他角色。 
- 搜索并点击 Service Account Token Creator 角色 ( - roles/iam.serviceAccountTokenCreator)。
- 点击保存。 
在 Confluent Cloud 中创建身份提供方
如需向 Confluent Cloud 进行身份验证,Google Cloud 服务账号需要身份池。您必须先在 Confluent Cloud 中创建身份提供方。
如需详细了解如何在 Confluent Cloud 中创建身份提供商,请访问添加 OAuth/OIDC 身份提供商页面。
- 在菜单中,点击账号和访问权限。 
- 点击工作负载身份。 
- 点击添加提供商。 
- 点击 OAuth/OIDC,然后点击下一步。 
- 点击其他 OIDC 提供商,然后点击下一步。 
- 提供身份提供方的用途名称和说明。 
- 点击显示高级配置。 
- 在签发者 URI 字段中,输入 - https://accounts.google.com。
- 在 JWKS URI 字段中,输入 - https://www.googleapis.com/oauth2/v3/certs。
- 点击验证并保存。 
在 Confluent Cloud 中创建身份池并授予适当的角色
您必须在身份配置文件下创建身份池,并授予必要的角色,以允许 Pub/Sub 服务账号进行身份验证并从 Confluent Cloud Kafka 主题中读取数据。
在继续创建身份池之前,请确保您的集群已在 Confluent Cloud 中创建。
如需详细了解如何创建身份池,请访问将身份池与 OAuth/OIDC 身份提供方搭配使用页面。
- 在菜单中,点击账号和访问权限。 
- 点击工作负载身份。 
- 点击您在 Confluent Cloud 中创建身份提供方中创建的身份提供方。 
- 点击添加池。 
- 为您的身份池提供名称和说明。 
- 将身份声明设置为 - claims。
- 在设置过滤条件下,点击高级标签页。输入以下代码: - claims.iss=='https://accounts.google.com' && claims.sub=='<SERVICE_ACCOUNT_UNIQUE_ID>'- 将 - <SERVICE_ACCOUNT_UNIQUE_ID>替换为记录服务账号唯一 ID 中找到的服务账号的唯一 ID。
- 点击下一步。 
- 点击添加新权限。然后,点击下一步。 
- 在相关集群中,点击添加角色分配。 
- 点击 Operator 角色,然后点击添加。 - 此角色授予 Pub/Sub。服务账号对包含您要注入到 Pub/Sub 的 Confluent Kafka 主题的集群的访问权限。 
- 在集群下方,点击主题。然后,点击添加角色分配。 
- 选择 DeveloperRead 角色。 
- 点击相应选项,然后指定主题或前缀。例如,特定主题、前缀规则或所有主题。 
- 点击添加。 
- 点击下一步。 
- 点击验证并保存。 
向 Pub/Sub 主账号添加 Pub/Sub 发布者角色
如需启用发布功能,您必须为 Pub/Sub 服务账号分配发布者角色,以便 Pub/Sub 能够发布到 Confluent Cloud 导入主题。
向 Pub/Sub 服务账号添加 Pub/Sub 服务代理角色
为了让 Pub/Sub 使用导入主题项目的发布配额,Pub/Sub 服务代理需要对导入主题的项目拥有 serviceusage.services.use 权限。
如需提供此权限,建议您向 Pub/Sub 服务账号添加 Pub/Sub 服务代理角色。
如果 Pub/Sub 服务账号没有 Pub/Sub 服务代理角色,可以按如下方式授予:
- 在 Google Cloud 控制台中,前往 IAM 页面。 
- 点击包括 Google提供的角色授权复选框。 
- 查找格式为 - service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com的服务账号。
- 对于此服务账号,点击修改主账号按钮。 
- 如果需要,请点击添加其他角色。 
- 搜索并点击 Pub/Sub 服务代理角色 ( - roles/pubsub.serviceAgent)。
- 点击保存。 
允许发布所有主题的内容
如果您尚未创建任何 Confluent Cloud 导入主题,请使用此方法。
- 在 Google Cloud 控制台中,前往 IAM 页面。 
- 点击包括 Google提供的角色授权复选框。 
- 查找格式为 - service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com的服务账号。
- 对于此服务账号,点击修改主账号按钮。 
- 如果需要,请点击添加其他角色。 
- 搜索并点击 Pub/Sub 发布者角色 ( - roles/pubsub.publisher)。
- 点击保存。 
启用从单个主题发布内容
仅当 Confluent Cloud 导入主题已存在时,才使用此方法。
- In the Google Cloud console, activate Cloud Shell. - At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize. 
- 运行 - gcloud pubsub topics add-iam-policy-binding命令:- gcloud pubsub topics add-iam-policy-binding TOPIC_ID \ --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \ --role="roles/pubsub.publisher" - 替换以下内容: - TOPIC_ID:Confluent Cloud 导入主题的 ID。
- PROJECT_NUMBER:项目编号。如需查看项目编号,请参阅标识项目。
 
- 在 Google Cloud 控制台中,前往 IAM 页面。 
- 对于发出创建或更新主题调用的主账号,请点击修改主账号按钮。 
- 如果需要,请点击添加其他角色。 
- 搜索并点击 Service account user 角色 ( - roles/iam.serviceAccountUser)。
- 点击保存。 
- 即使快速连续地创建主题和订阅,也可能会导致数据丢失。在订阅期结束后的短时间内,主题仍会存在。如果在此期间向主题发送任何数据,这些数据都会丢失。通过先创建主题、创建订阅,然后将主题转换为导入主题,您可以确保在导入过程中不会遗漏任何消息。 
- 如果您需要重新创建现有导入主题的 Kafka 主题,但名称相同,则无法仅删除 Kafka 主题并重新创建它。此操作可能会使 Pub/Sub 的偏移管理失效,从而导致数据丢失。如需缓解此问题,请按以下步骤操作: - 删除 Pub/Sub 导入主题。
- 删除 Kafka 主题。
- 创建 Kafka 主题。
- 创建 Pub/Sub 导入主题。
 
- 系统始终会从 Confluent Cloud Kafka 主题的最早偏移量读取数据。 
-  在 Google Cloud 控制台中,前往主题页面。 
- 点击创建主题。
- 在主题 ID 字段中,输入导入主题的 ID。 如需详细了解主题命名,请参阅命名准则。
- 选择添加默认订阅。
- 选择启用提取。
- 对于提取来源,请选择 Confluent Cloud。
-   输入以下详细信息:-   引导服务器:包含您要提取到 Pub/Sub 中的 Kafka 主题的集群的引导服务器。
    格式如下:hostname:port。
- 集群 ID:包含您要提取到 Pub/Sub 中的 Kafka 主题的集群的 ID。
- 主题:您要将数据从 Kafka 主题提取到 Pub/Sub 中的 Kafka 主题的名称。
- 身份池 ID:用于通过 Confluent Cloud 进行身份验证的身份池的池 ID。
- 服务账号:您在在 Google Cloud 中创建服务账号中创建的服务账号。
 
-   引导服务器:包含您要提取到 Pub/Sub 中的 Kafka 主题的集群的引导服务器。
    格式如下:
- 点击创建主题。
-  
  
   
   
  
 
 
 
  
    
    In the Google Cloud console, activate Cloud Shell. At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize. 
-   运行 gcloud pubsub topics create命令:gcloud pubsub topics create TOPIC_ID 
 --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER
 --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID
 --confluent-cloud-ingestion-topic CONFLUENT_TOPIC
 --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID
 --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT替换以下内容: -    TOPIC_ID:您的 Pub/Sub 主题的名称或 ID。
-    CONFLUENT_BOOTSTRAP_SERVER:包含您要提取到 Pub/Sub 中的 Kafka 主题的集群的引导服务器。格式如下:hostname:port。
-    CONFLUENT_CLUSTER_ID:包含您要提取到 Pub/Sub 中的 Kafka 主题的集群的 ID。
-    CONFLUENT_TOPIC:您要提取到 Pub/Sub 中的 Kafka 主题的名称。
-    CONFLUENT_IDENTITY_POOL_ID:用于向 Confluent Cloud 进行身份验证的身份池的池 ID。
-    PUBSUB_SERVICE_ACCOUNT:您在在 Google Cloud 中创建服务账号中创建的服务账号。
 
-    
- 在 Google Cloud 控制台中,前往主题页面。 
- 点击 Confluent Cloud 导入主题。 
- 在主题详情页面中,点击修改。 
- 更新您要更改的字段。 
- 点击更新。 
- 
  
   
   
  
 
 
 
  
    
    In the Google Cloud console, activate Cloud Shell. At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize. 为避免丢失导入主题的设置,请务必在每次更新主题时添加所有设置。如果您遗漏了某些内容,Pub/Sub 会将相应设置重置为原始默认值。 
- 运行 - gcloud pubsub topics update命令,并使用以下示例中提及的所有标志:- gcloud pubsub topics update TOPIC_ID \ --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \ --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \ --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \ --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \ --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT - 替换以下内容: - TOPIC_ID:您的 Pub/Sub 主题的名称或 ID。
- CONFLUENT_BOOTSTRAP_SERVER:包含您要提取到 Pub/Sub 中的 Kafka 主题的集群的引导服务器。格式如下:- hostname:port。
- CONFLUENT_CLUSTER_ID:包含您要提取到 Pub/Sub 中的 Kafka 主题的集群的 ID
- CONFLUENT_TOPIC:您要提取到 Pub/Sub 中的 Kafka 主题的名称。
- CONFLUENT_IDENTITY_POOL_ID:用于通过 Confluent Cloud 进行身份验证的身份池的池 ID。
- CONFLUENT_IDENTITY_POOL_ID:您在在 Google Cloud 中创建服务账号中创建的服务账号。
 
- 为您的主题选择订阅类型。 
- 了解如何向主题发布消息。 
- 使用 gcloud CLI、REST API 或客户端库创建或修改主题。 
向服务账号添加服务账号用户角色
Service Account User 角色 (roles/iam.serviceAccountUser) 包含权限 iam.serviceAccounts.actAs,可让主账号将服务账号附加到 Confluent Cloud 导入主题的提取设置,并使用该服务账号进行联合身份验证。
使用 Confluent Cloud 导入主题
您可以创建新的导入主题,也可以修改现有主题。
注意事项
创建 Confluent Cloud 导入主题
如需详细了解与主题关联的属性,请参阅主题的属性。
确保您已完成以下步骤:
如需创建 Confluent Cloud 导入主题,请按以下步骤操作:
控制台
gcloud
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
Go
以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅迁移到 v2 的指南。如需查看 v1 代码示例的列表,请参阅 已弃用的代码示例。
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Node.ts
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
如果您遇到了问题,请参阅排查 Confluent Cloud 导入主题方面的问题。
修改 Confluent Cloud Hubs 导入主题
如需修改 Confluent Cloud 导入主题的提取数据源设置,请按以下步骤操作:
控制台
gcloud
配额和限制
导入主题的发布者吞吐量受主题的发布配额限制。如需了解详情,请参阅 Pub/Sub 配额和限制。