本文档详细介绍了如何将数据从 Pub/Sub Lite 主题迁移到 适用于 BigQuery 的 Apache Kafka 主题。
准备工作
- 启用 Google Kubernetes Engine API。
- 确定要迁移的 Pub/Sub 精简版主题。确定用于以下用途的名称: 适用于 BigQuery 的 Apache Kafka 中的目标主题。此外,还要确定要迁移到哪个适用于 BigQuery 的 Apache Kafka 集群。
迁移工作流
如需迁移数据,请完成以下任务。关于“广告资源”委托类型 此页面后面部分会显示任务示例
- 创建 Google Kubernetes Engine 服务账号。
- 创建 Google Kubernetes Engine 集群。
- 使用配置自定义 Docker 映像 主题的详细信息。
将 Docker 映像部署到 Google Kubernetes Engine 集群。
映像内部包含 Kafka Connect 和适用于 Kafka 的 Pub/Sub Lite 插件 Connect 用于订阅新的 Pub/Sub Lite 订阅, 发布到 Apache Kafka for BigQuery
创建 Google Kubernetes Engine 服务账号
本部分介绍如何创建 运行 Google Kubernetes Engine 集群所需的权限。
在 Google Cloud 控制台中,创建一个新的 IAM 服务账号,并使其具有运行 Google Kubernetes Engine 所需的最低权限。
向服务账号授予以下其他 IAM 角色。这些 可促进迁移过程。
- 代管式 Kafka 客户端角色 (
roles/managedkafka.client
) - Pub/Sub 精简版订阅者角色 (
roles/pubsublite.subscriber
) - Pub/Sub Lite Viewer 角色 (
roles/pubsublite.Viewer
) - Artifact Registry Reader 角色 (
roles/artifactregistry.reader
)
- 代管式 Kafka 客户端角色 (
创建 GKE 集群
本部分介绍如何创建使用该服务的 GKE 集群 您在上一步中创建的账号。
转到 Google Cloud 控制台中的 Google Kubernetes Engine 页面。
点击 add_box 创建。
系统随即会显示创建 Autopilot 集群页面。
在高级设置标签页下,将服务账号更改为 IAM 服务账号。
(可选)配置其他设置 。
如需创建集群,请点击创建。
创建 Kafka Connect Docker 映像
本部分介绍如何创建和自定义 Kafka Connect Docker 映像 。
- 克隆 Pub/Sub Lite 迁移 GitHub 代码库。
针对您之前创建的 IAM 服务账号,生成 JSON 账号密钥。
使用 base64 工具对 JSON 密钥进行编码。例如,
Linux
base64 -w 0 < my_service_account.json > password.txt
Mac
base64 < account_key_json > password.txt
在Secret 文件中 GitHub 代码库,使用相应的 将代码库与您的 Google Cloud 项目相关联, Pub/Sub Lite 和 Kafka。
.gcp/gmk_sasl_service_account → sensitive <service-account-name>@<gcp-project>.iam.gserviceaccount.com .gcp/gmk_sasl_service_account_key → sensitive <base64 encoded sasl service account key> .gcp/kafka_ssl_truststore_location → sensitive <full path of the ssl truststore jks file location> .gcp/kafka_ssl_truststore_password → sensitive <password for the ssl truststore jks> .gcp/gmk_bootstrap_servers → environment specific bootstrap.<google-managed-kafka-cluster-name>.<google-managed-kafka-cluster-region name>.managedkafka.<google-managed-cluster-host-project-name>.cloud.goog:9092 .gcp/kafka_connect_group_id → environment specific <Kafka Connect group id (unique per worker group) for the Kafka connect workers in distributed mode> .gcp/kafka_config_storage_topic → environment specific <Kafka topic name used by Kafka Connect for tracking the config> .gcp/kafka_offset_storage_topic → environment specific <Kafka topic name used by Kafka Connect for tracking the offsets> .gcp/kafka_status_storage_topic → environment specific <Kafka topic name used by Kafka Connect for tracking the status> .gcp/kafka_sink_topic → environment specific <target sink Kafka topic name used by Kafka Connect for migrating the data from the Pub/Sub Lite topic> .gcp/pubsub_lite_gcp_project → environment specific <Google Cloud project that hosts the Pub/Sub Lite source subscription to be used for migrating the Pub/Sub Lite topic to sink the Kafka topic> .gcp/pubsub_lite_gcp_location → environment specific <Google Cloud location for the Pub/Sub Lite source subscription tor migrate the Pub/Sub Lite topic to sink Kafka topic> .gcp/pubsub_lite_subscription → environment specific <Pub/Sub Lite source subscription name to be used for migrating the pubsub lite topic to Kafka topic>
通过运行
docker/build-image.sh
文件构建 Docker 映像。./push-image.sh
使用您的 Google Cloud 项目名称更新
docker/push-image.sh
映像。通过运行
docker/push-image.sh
将映像推送到 Artifact Registry 文件。./push-image.sh
部署 Kafka Connect 工作负载
本部分介绍如何将 Kafka Connect Docker 映像部署到 Google Kubernetes Engine 集群。
- 安装和配置 kubectl 命令 身份验证插件。
- 生成 kubeconfig 适用于 Google Kubernetes Engine 集群
创建 Google Kubernetes Engine 服务账号并向其授予正确的权限 来模拟您的 IAM 账号。
$KSA_NAME = KUBERNETES_SERVICE_ACCOUNT_NAME $PROJECT_ID = GOOGLE_CLOUD_PROJECT_ID $IAM_SA_NAME = IAM_SERVICE_ACCOUNT_NAME kubectl create serviceaccount $KSA_NAME \ --namespace=default gcloud iam service-accounts add-iam-policy-binding \ $IAM_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com \ --role roles/iam.workloadIdentityUser \ --member "serviceAccount:$PROJECT_ID.svc.id.goog[default/$KSA_NAME]" kubectl annotate serviceaccount $KSA_NAME \ --namespace default \ iam.gke.io/gcp-service-account=$IAM_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com
在文本编辑器中打开 K8s.yaml 文件并更新以下值。
- 将
<workflow_name>
替换为您的 Kafka Connect 工作流的名称。 - 将
<gke_service_account>
替换为 Google Kubernetes Engine 服务账号 名称。
- 将
运行 K8s.yaml 文件。
kubectl create -f k8s.yaml
这会创建在您的 Google Kubernetes Engine 中运行的 Kafka Connect 工作负载 并启动 Pub/Sub Lite 连接器,将数据从您的 从 Pub/Sub Lite 主题到 Apache Kafka for BigQuery。
监控作业
作业运行后,您可以通过连接到 Kafka Connect 来进行检查 REST 端点。
- 在 Google Cloud 控制台中,转到部署详情 >“工作负载”页面。
点击您的 Kubernetes 部署。
部署详情页面会打开。
在公开服务下,点击公开,然后添加端口
8083
。启用端口转发。
您在设置端口转发时获得的默认链接会返回 输出类似于以下内容:
{"version":"3.4.0","commit":"2e1947d240607d53","kafka_cluster_id":"6H6qWA0dQnuK31hBPqYUDg"}
如果您将
/connectors
附加到链接中,系统会列出正在运行的连接器, 例如:["PubSubLiteSourceConnector"]
例如,点击此链接
url:8083/connectors/PubSubLiteSourceConnector/status
会生成任务列表, 及其状态。{"name":"PubSubLiteSourceConnector","connector":{"state":"RUNNING","worker_id":"10.53.0.157:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":1,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":2,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":3,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":4,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":5,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":6,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":7,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":8,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":9,"state":"RUNNING","worker_id":"10.53.0.157:8083"}],"type":"source"}
分阶段迁移
将 Pub/Sub Lite 主题迁移到 Kafka 后,您可以 迁移订阅者和发布商为此,请按以下说明操作 步骤。
迁移订阅者。更新订阅者以消费 Kafka 主题而不是 Pub/Sub Lite 主题。
此操作应在受控的开发环境中逐步进行。
理想情况下,您应该维护两组订阅者以验证完全相同的消息 都是从 Kafka 和 Pub/Sub Lite 接收的。 确认行为正确无误后 您可以停用自己的 Pub/Sub Lite 位订阅者。
迁移发布商。请更新您的发布商,以便直接发布到 Kafka 主题而不是 Pub/Sub Lite 主题。
与迁移订阅者类似,此操作应在 可控的开发环境如果您不希望出现重复数据, 可以维护两组发布商以验证其行为。完成验证后 停用 Pub/Sub Lite 发布者。
在迁移完所有订阅者和发布商后,停用 来移除工作负载和集群
删除原始 Pub/Sub 精简版主题。