借助 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 导入主题,您可以将 Amazon MSK 作为外部来源,将数据持续注入到 Pub/Sub 中。然后,您可以将数据流式传输到 Pub/Sub 支持的任何目的地。
本文档介绍了如何创建和管理 Amazon MSK 导入主题。如需创建标准主题,请参阅创建标准主题。
如需详细了解导入主题,请参阅导入主题简介。
准备工作
详细了解 Pub/Sub 发布流程。
设置工作负载身份联合,以便Google Cloud 可以访问外部在线播放服务。
所需的角色和权限
如需获得创建和管理 Amazon MSK 导入主题所需的权限,请让管理员向您授予主题或项目的 Pub/Sub Editor (roles/pubsub.editor
) IAM 角色。
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
此预定义角色包含创建和管理 Amazon MSK 导入主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:
所需权限
如需创建和管理 Amazon MSK 导入主题,需要具备以下权限:
-
创建导入主题:
pubsub.topics.create
-
删除导入主题:
pubsub.topics.delete
-
获取导入主题:
pubsub.topics.get
-
列出导入主题:
pubsub.topics.list
-
发布到导入主题:
pubsub.topics.publish
-
更新导入主题:
pubsub.topics.update
-
获取导入主题的 IAM 政策:
pubsub.topics.getIamPolicy
-
为导入主题配置 IAM 政策:
pubsub.topics.setIamPolicy
您可以在项目级别和个别资源级别配置访问权限控制。
设置联合身份以访问 Amazon MSK
借助工作负载身份联合, Google Cloud 服务可以访问 Google Cloud外部运行的工作负载。借助身份联合,您无需维护或传递凭据,即可 Google Cloud 访问其他云中的资源。不过,您可以使用工作负载本身的身份进行身份验证 Google Cloud 并访问资源。
在 Google Cloud中创建服务账号
这是一个可选步骤。 如果您已有服务账号,则可以在本过程中使用该账号,而无需创建新的服务账号。如果您使用的是现有服务账号,请参阅记录服务账号唯一 ID 以了解后续步骤。
对于 Amazon MSK 导入主题,Pub/Sub 会使用服务账号作为身份来访问 AWS 中的资源。
如需详细了解如何创建服务账号(包括前提条件、所需角色和权限以及命名准则),请参阅创建服务账号。创建服务账号后,您可能需要等待 60 秒或更长时间才能使用该服务账号。出现这种行为的原因是读取操作是最终一致的;新服务账号可能需要一段时间才能显示。
记录服务账号唯一 ID
您需要服务账号唯一 ID 才能在 AWS 控制台中设置角色。
在 Google Cloud 控制台中,前往服务账号详情页面。
点击您刚刚创建的服务账号或您打算使用的服务账号。
在服务账号详情页面中,记录唯一 ID 编号。
您需要在工作流中使用该 ID 来在 AWS 控制台中设置角色。
向 Pub/Sub 服务账号添加 Service Account Token Creator 角色
Service Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator
) 可让主账号为服务账号创建短期有效凭据。这些令牌或凭据用于模拟服务账号。
如需详细了解服务账号模拟,请参阅服务账号模拟。
您还可以在此过程中添加 Pub/Sub Publisher 角色 (roles/pubsub.publisher
)。如需详细了解该角色以及添加该角色的原因,请参阅向 Pub/Sub 服务账号添加 Pub/Sub 发布商角色。
在 Google Cloud 控制台中,转到 IAM 页面。
点击包括 Google提供的角色授权复选框。
查找格式为
service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com
的服务账号。对于此服务账号,点击修改主账号按钮。
根据需要,点击添加其他角色。
搜索并点击 Service account token creator 角色 (
roles/iam.serviceAccountTokenCreator
)。点击保存。
在 AWS 中创建政策
您需要在 AWS 中创建一项政策,以便 Pub/Sub 向 AWS 进行身份验证,从而从 Amazon MSK 中提取数据。
- 如需了解更多方法以及如何在 AWS 中创建政策,请参阅创建 IAM 政策。
如需在 AWS 中创建政策,请执行以下步骤:
登录 AWS 管理控制台,然后打开 IAM 控制台。
在 IAM 控制台的导航窗格中,依次点击访问权限管理 > 政策。
点击创建政策。
对于点击某项服务,请点击 MSK。
对于允许的操作,依次点击读取 > GetBootstrapBrokers。
此操作会授予获取 Pub/Sub 用于连接到 MSK 集群的引导代理的权限。
点击添加更多权限。
在选择服务部分,点击 Apache Kafka APIs for MSK。
对于允许的操作,请选择以下选项:
列表 > DescribeTopic
此操作会授予权限,以允许 Pub/Sub 提取主题获取有关 Amazon MSK Kafka 主题的详细信息。
读取 > ReadData
此操作会授予从 Amazon MSK Kafka 主题读取数据的权限。
写入 > 连接
此操作会授予连接到 Amazon MSK Kafka 集群并对其进行身份验证的权限。
对于资源,请指定集群 ARN(如果您想将政策限制为仅适用于特定集群,这是推荐做法)。
点击添加更多权限。
在选择服务部分,点击 STS。
对于允许的操作,依次点击 Write > AssumeRoleWithWebIdentity。
此操作会授予权限,以便获取一组临时安全凭据,以便 Pub/Sub 使用身份联合功能向 Amazon MSK 进行身份验证。
点击下一步。
输入政策名称和说明。
点击创建政策。
使用自定义信任政策在 AWS 中创建角色
您必须在 AWS 中创建一个角色,以便 Pub/Sub 能够对 AWS 进行身份验证,从而从 Amazon MSK 提取数据。
登录 AWS 管理控制台,然后打开 IAM 控制台。
在 IAM 控制台的导航窗格中,点击 Roles。
点击 Create role。
对于 Select trusted entity(选择可信实体),点击 Custom trust policy(自定义信任政策)。
在自定义信任政策部分中,输入或粘贴以下内容:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Federated": "accounts.google.com" }, "Action": "sts:AssumeRoleWithWebIdentity", "Condition": { "StringEquals": { "accounts.google.com:sub": "<SERVICE_ACCOUNT_UNIQUE_ID>" } } } ] }
将
<SERVICE_ACCOUNT_UNIQUE_ID>
替换为您在记录服务账号唯一 ID 中记录的服务账号的唯一 ID。点击下一步。
对于添加权限,请搜索并点击您刚刚创建的自定义政策。
点击下一步。
输入角色名称和说明。
点击 Create role。
将 Pub/Sub 发布商角色添加到 Pub/Sub 正文
如需启用发布功能,您必须向 Pub/Sub 服务账号分配发布商角色,以便 Pub/Sub 能够向 Amazon MSK 导入主题发布内容。
启用从所有主题发布内容
如果您尚未创建任何 Amazon MSK 导入主题,请使用此方法。
在 Google Cloud 控制台中,转到 IAM 页面。
点击包括 Google提供的角色授权复选框。
查找格式为
service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com
的服务账号。对于此服务账号,点击修改主账号按钮。
根据需要,点击添加其他角色。
搜索并点击 Pub/Sub 发布商角色 (
roles/pubsub.publisher
)。点击保存。
启用从单个主题发布
仅当 Amazon MSK 导入主题已存在时,才使用此方法。
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
运行
gcloud pubsub topics add-iam-policy-binding
命令:gcloud pubsub topics add-iam-policy-binding TOPIC_ID \ --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \ --role="roles/pubsub.publisher"
替换以下内容:
TOPIC_ID
:Amazon MSK 导入主题的主题 ID。PROJECT_NUMBER
:项目编号。如需查看项目编号,请参阅标识项目。
向服务账号添加服务账号用户角色
Service Account User 角色 (roles/iam.serviceAccountUser
) 包含 iam.serviceAccounts.actAs
权限,可让主账号将服务账号附加到 Amazon MSK 导入主题的提取设置,并使用该服务账号进行联合身份验证。
在 Google Cloud 控制台中,转到 IAM 页面。
对于发出创建或更新主题调用的主账号,请点击修改主账号按钮。
根据需要,点击添加其他角色。
搜索并点击 Service Account User 角色 (
roles/iam.serviceAccountUser
)。点击保存。
使用 Amazon MSK 导入主题
您可以创建新的导入主题,也可以修改现有主题。
注意事项
分别创建主题和订阅(即使是快速连续创建)可能会导致数据丢失。在订阅之前,主题会存在一小段时间。如果在此期间向主题发送了任何数据,这些数据将丢失。通过先创建主题、创建订阅,然后将主题转换为导入主题,您可以确保在导入过程中不会遗漏任何消息。
如果您需要使用相同名称重新创建现有导入主题的 Kafka 主题,则不能仅删除 Kafka 主题并重新创建。此操作可能会使 Pub/Sub 的偏移量管理失效,从而导致数据丢失。如需缓解此问题,请按以下步骤操作:
- 删除 Pub/Sub 导入主题。
- 删除 Kafka 主题。
- 创建 Kafka 主题。
- 创建 Pub/Sub 导入主题。
系统始终会从 earliest offset(最早的偏移量)读取 Amazon MSK Kafka 主题中的数据。
创建 Amazon MSK 导入主题
如需详细了解与主题关联的属性,请参阅主题的属性。
确保您已完成以下步骤:
如需创建 Amazon MSK 导入主题,请按以下步骤操作:
控制台
在 Google Cloud 控制台中,前往主题页面。
点击创建主题。
在主题 ID 字段中,输入 Amazon MSK 导入主题的 ID。如需详细了解如何命名主题,请参阅命名准则。
选择添加默认订阅。
选择启用提取。
在“提取来源”中,选择 Amazon MSK。
输入以下详细信息:
集群 ARN:要提取到 Pub/Sub 的 Amazon MSK 的 ARN。ARN 格式如下:
arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}
。主题:要提取到 Pub/Sub 的 Amazon MSK Kafka 主题的名称。
AWS 角色 ARN:AWS 角色的 ARN。角色的 ARN 格式如下:
arn:aws:iam:${Account}:role/${RoleName}
。服务账号:您在在 Google Cloud中创建服务账号中创建的服务账号。
点击创建主题。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
运行
gcloud pubsub topics create
命令:gcloud pubsub topics create TOPIC_ID \ --aws-msk-ingestion-cluster-arn MSK_CLUSTER_ARN \ --aws-msk-ingestion-topic MSK_TOPIC \ --aws-msk-ingestion-aws-role-arn MSK_ROLE_ARN \ --aws-msk-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
替换以下内容:
TOPIC_ID
:您的 Pub/Sub 主题的名称或 ID。MSK_CLUSTER_ARN
:要提取到 Pub/Sub 的 Amazon MSK 集群的 ARN。ARN 格式如下:arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}
。MSK_TOPIC
:要提取到 Pub/Sub 的 Amazon MSK Kafka 主题的名称。MSK_ROLE_ARN
:AWS 角色的 ARN。该角色的 ARN 格式如下:arn:aws:iam:${Account}:role/${RoleName}
。PUBSUB_SERVICE_ACCOUNT
:您在在 Google Cloud 中创建服务账号中创建的服务账号。
Go
在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档。
如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
Java
在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
Node.js
在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Node.js 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
Python
在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
C++
在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
Node.js (TypeScript)
在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 Node.js 设置说明执行操作。 如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
如需详细了解 ARN,请参阅 Amazon 资源名称 (ARN) 和 IAM 标识符。
如果您遇到问题,请参阅“对 Amazon MSK 导入进行问题排查”主题。
修改 Amazon MSK 导入主题
如需修改 Amazon MSK 导入主题的提取数据源设置,请按以下步骤操作:
控制台
在 Google Cloud 控制台中,前往主题页面。
点击“Amazon MSK 导入”主题。
在主题详情页面中,点击修改。
更新您要更改的字段。
点击更新。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
使用以下示例中提到的所有标志运行
gcloud pubsub topics update
命令:gcloud pubsub topics update TOPIC_ID \ --aws-msk-ingestion-cluster-arn MSK_CLUSTER_ARN \ --aws-msk-ingestion-topic MSK_TOPIC \ --aws-msk-ingestion-aws-role-arn MSK_ROLE_ARN \ --aws-msk-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
替换以下内容:
- TOPIC_ID:您的 Pub/Sub 主题的名称或 ID。
- MSK_CLUSTER_ARN:要提取到 Pub/Sub 的 Amazon MSK 集群的 ARN。ARN 格式如下:
arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}
。 - MSK_TOPIC:要提取到 Pub/Sub 的 Amazon MSK Kafka 主题的名称。
- MSK_ROLE_ARN:AWS 角色的 ARN。角色的 ARN 格式如下:
arn:aws:iam:${Account}:role/${RoleName}
。 - PUBSUB_SERVICE_ACCOUNT:您在在 Google Cloud 中创建服务账号中创建的服务账号。
配额和限制
导入主题的发布者吞吐量受主题的发布配额的约束。如需了解详情,请参阅 Pub/Sub 配额和限制。
后续步骤
为您的主题选择订阅类型。
了解如何向主题发布消息。
使用 gcloud CLI、REST API 或客户端库创建或修改主题。