从 Pub/Sub Lite 迁移到 Google Cloud Managed Service for Apache Kafka

本文档详细介绍了如何将数据从 Pub/Sub Lite 主题迁移到 Managed Service for Apache Kafka 主题。

准备工作

  1. 启用 Google Kubernetes Engine API。
  2. 确定要迁移的 Pub/Sub Lite 主题。在 Google Cloud Managed Service for Apache Kafka 中为目标主题确定名称。此外,还要确定要迁移到的 Managed Service for Apache Kafka Cluster

迁移工作流

如需迁移数据,请完成以下任务。本页面后面会详细介绍这些任务。

  1. 创建 Google Kubernetes Engine 服务账号
  2. 创建一个 Google Kubernetes Engine 集群
  3. 使用主题的配置详细信息自定义 Docker 映像
  4. Docker 映像部署到 Google Kubernetes Engine 集群。

    在该映像中,Kafka Connect 和适用于 Kafka Connect 的 Pub/Sub Lite 插件用于订阅新的 Pub/Sub Lite 订阅,并发布到 Managed Service for Apache Kafka。

创建 Google Kubernetes Engine 服务账号

本部分介绍了如何创建具有运行 Google Kubernetes Engine 集群所需权限的 IAM 服务账号。

  1. Google Cloud 控制台中,创建新的 IAM 服务账号,并为其授予操作 Google Kubernetes Engine 所需的最低权限。

  2. 向服务账号授予以下额外的 IAM 角色。这些角色有助于顺利完成迁移过程。

    • Managed Kafka Client 角色 (roles/managedkafka.client)
    • Pub/Sub Lite Subscriber 角色 (roles/pubsublite.subscriber)
    • Pub/Sub Lite Viewer 角色 (roles/pubsublite.Viewer)
    • Artifact Registry Reader 角色 (roles/artifactregistry.reader)

创建 GKE 集群

本部分介绍如何创建使用您在上一步中创建的服务账号的 GKE 集群。

  1. 转到 Google Cloud 控制台中的 Google Kubernetes Engine 页面。

    转到 Google Kubernetes Engine

  2. 点击 创建

    系统随即会显示创建 Autopilot 集群页面。

  3. 高级设置标签页下,将服务账号更改为您在上一步中创建的 IAM 服务账号

  4. (可选)根据需要配置其他设置

  5. 如需创建集群,请点击创建

创建 Kafka Connect Docker 映像

本部分介绍了如何为您的主题创建和自定义 Kafka Connect Docker 映像。

  1. 克隆 Pub/Sub Lite 迁移 GitHub 代码库。
  2. 为您之前创建的 IAM 服务账号生成 JSON 账号密钥

    使用 base64 工具对 JSON 密钥进行编码。例如,

    Linux

    base64 -w 0 < my_service_account.json > password.txt
    

    Mac

    base64 < account_key_json > password.txt
    
  3. 在 GitHub 代码库中的密钥文件中,使用适当的信息更新以下文件,以将代码库关联到您的 Google Cloud 项目、Pub/Sub Lite 和 Kafka。

    .gcp/gmk_sasl_service_account → sensitive
    <service-account-name>@<gcp-project>.iam.gserviceaccount.com
    
    .gcp/gmk_sasl_service_account_key → sensitive
    <base64 encoded sasl service account key>
    
    .gcp/kafka_ssl_truststore_location → sensitive
    <full path of the ssl truststore jks file location>
    
    .gcp/kafka_ssl_truststore_password → sensitive
    <password for the ssl truststore jks>
    
    .gcp/gmk_bootstrap_servers → environment specific
    bootstrap.<google-managed-kafka-cluster-name>.<google-managed-kafka-cluster-region name>.managedkafka.<google-managed-cluster-host-project-name>.cloud.goog:9092
    
    .gcp/kafka_connect_group_id → environment specific
    <Kafka Connect group id (unique per worker group) for the Kafka connect workers in distributed mode>
    
    .gcp/kafka_config_storage_topic → environment specific
    <Kafka topic name used by Kafka Connect for tracking the config>
    
    .gcp/kafka_offset_storage_topic → environment specific
    <Kafka topic name used by Kafka Connect for tracking the offsets>
    
    .gcp/kafka_status_storage_topic → environment specific
    <Kafka topic name used by Kafka Connect for tracking the status>
    
    .gcp/kafka_sink_topic → environment specific
    <target sink Kafka topic name used by Kafka Connect for migrating the data from the Pub/Sub Lite topic>
    
    .gcp/pubsub_lite_gcp_project → environment specific
    <Google Cloud project that hosts the Pub/Sub Lite source subscription to be used for migrating the Pub/Sub Lite topic to sink the Kafka topic>
    
    .gcp/pubsub_lite_gcp_location → environment specific
    <Google Cloud location for the Pub/Sub Lite source subscription tor migrate the Pub/Sub Lite topic to sink Kafka topic>
    
    .gcp/pubsub_lite_subscription → environment specific
    <Pub/Sub Lite source subscription name to be used for migrating the pubsub lite topic to Kafka topic>
    
  4. 通过运行 docker/build-image.sh 文件构建 Docker 映像。

    ./push-image.sh
    
  5. docker/push-image.sh 映像更新为您的 Google Cloud 项目名称。

  6. 运行 docker/push-image.sh 文件,将映像推送到 Artifact Registry。

    ./push-image.sh
    

部署 Kafka Connect 工作负载

本部分介绍了如何将 Kafka Connect Docker 映像部署到 Google Kubernetes Engine 集群。

  1. 使用身份验证插件安装并配置 kubectl
  2. 为您的 Google Kubernetes Engine 集群生成 kubeconfig
  3. 创建一个 Google Kubernetes Engine 服务账号并向其授予正确的权限,以便其模拟您的 IAM 账号。

    $KSA_NAME = KUBERNETES_SERVICE_ACCOUNT_NAME
    $PROJECT_ID = GOOGLE_CLOUD_PROJECT_ID
    $IAM_SA_NAME = IAM_SERVICE_ACCOUNT_NAME
    
    kubectl create serviceaccount $KSA_NAME \
        --namespace=default
    
    gcloud iam service-accounts add-iam-policy-binding \
    $IAM_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com \
        --role roles/iam.workloadIdentityUser \
        --member "serviceAccount:$PROJECT_ID.svc.id.goog[default/$KSA_NAME]"
    
    kubectl annotate serviceaccount $KSA_NAME \
        --namespace default \
    iam.gke.io/gcp-service-account=$IAM_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com
    
  4. 在文本编辑器中打开 K8s.yaml 文件,然后更新以下值。

    1. <workflow_name> 替换为您的 Kafka Connect 工作流的名称。
    2. <gke_service_account> 替换为 Google Kubernetes Engine 服务账号名称。
  5. 运行 K8s.yaml 文件。

    kubectl create -f k8s.yaml
    

    这会创建在 Google Kubernetes Engine 集群中运行的 Kafka Connect 工作负载,并启动 Pub/Sub Lite 连接器,以便将数据从 Pub/Sub Lite 主题移至 Google Cloud Managed Service for Apache Kafka。

监控作业

作业运行后,您可以通过连接到 Kafka Connect REST 端点来检查作业。

  1. 在 Google Cloud 控制台中,依次前往 Deployment detail > Workload 页面。
  2. 点击您的 Kubernetes 部署。

    系统会打开“部署详情”页面。

  3. 公开服务下,点击公开,然后添加端口 8083

  4. 启用端口转发。

    您通过设置端口转发获得的默认链接会返回类似于以下内容的输出:

    {"version":"3.4.0","commit":"2e1947d240607d53","kafka_cluster_id":"6H6qWA0dQnuK31hBPqYUDg"}
    

    如果您将 /connectors 附加到链接,系统会列出正在运行的连接器,例如:

    ["PubSubLiteSourceConnector"]
    

    例如,检查此链接 url:8083/connectors/PubSubLiteSourceConnector/status 会生成任务及其状态的列表。

    {"name":"PubSubLiteSourceConnector","connector":{"state":"RUNNING","worker_id":"10.53.0.157:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":1,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":2,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":3,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":4,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":5,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":6,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":7,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":8,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":9,"state":"RUNNING","worker_id":"10.53.0.157:8083"}],"type":"source"}
    

分阶段迁移

将 Pub/Sub Lite 主题迁移到 Kafka 后,您可以迁移订阅者和发布者。为此,请按以下步骤操作。

  1. 迁移订阅者。更新订阅方,以使用 Kafka 主题而非 Pub/Sub Lite 主题。

    这应在受控开发环境中逐步完成。

    理想情况下,您应维护两组订阅者,以验证是否从 Kafka 和 Pub/Sub Lite 收到了相同的消息。验证正确行为后,您可以停用 Pub/Sub Lite 订阅方。

  2. 迁移发布商。更新发布者,以便直接发布到 Kafka 主题,而不是 Pub/Sub 精简版主题。

    与迁移订阅者类似,这项工作应在受控开发环境中逐步完成。如果重复数据不是问题,您可以维护两组发布商来验证行为。验证行为后,停用您的 Pub/Sub Lite 发布端。

  3. 迁移完所有订阅方和发布方后,请通过删除工作负载和集群来停用迁移工具。

  4. 删除原始 Pub/Sub 精简版主题。

后续步骤