从 Pub/Sub Lite 迁移到 Apache Kafka for BigQuery

本文档详细介绍了如何将数据从 Pub/Sub Lite 主题迁移到 Apache Kafka for BigQuery 主题。

准备工作

  1. 启用 Google Kubernetes Engine API。
  2. 确定要迁移的 Pub/Sub 精简版主题。确定 Apache Kafka for BigQuery 中目标主题的名称。此外,还要确定要迁移到哪个适用于 BigQuery 的 Apache Kafka 集群

迁移工作流

如需迁移数据,请完成以下任务。本页面的后面部分会详细介绍这些任务。

  1. 创建 Google Kubernetes Engine 服务帐号
  2. 创建 Google Kubernetes Engine 集群
  3. 使用主题的配置详细信息自定义 Docker 映像
  4. 将 Docker 映像部署到 Google Kubernetes Engine 集群。

    在映像内,Kafka Connect 和适用于 Kafka Connect 的 Pub/Sub Lite 插件用于订阅新的 Pub/Sub Lite 订阅,并发布到 Apache Kafka for BigQuery。

创建 Google Kubernetes Engine 服务帐号

本部分介绍如何创建具有运行 Google Kubernetes Engine 集群所需权限的 IAM 服务帐号。

  1. Google Cloud 控制台中,创建一个新的 IAM 服务账号,并使其具有运行 Google Kubernetes Engine 所需的最低权限。

  2. 向服务帐号授予以下其他 IAM 角色。这些角色可协助完成迁移过程。

    • 代管式 Kafka 客户端角色 (roles/managedkafka.client)
    • Pub/Sub 精简版订阅者角色 (roles/pubsublite.subscriber)
    • Pub/Sub Lite Viewer 角色 (roles/pubsublite.Viewer)
    • Artifact Registry Reader 角色 (roles/artifactregistry.reader)

创建 GKE 集群

本部分介绍如何创建 GKE 集群,该集群使用您在上一步中创建的服务帐号。

  1. 转到 Google Cloud 控制台中的 Google Kubernetes Engine 页面。

    转到 Google Kubernetes Engine

  2. 点击 创建

    系统随即会显示创建 Autopilot 集群页面。

  3. 高级设置标签页下,将服务帐号更改为您在上一步中创建的 IAM 服务帐号

  4. (可选)根据需要配置其他设置

  5. 如需创建集群,请点击创建

创建 Kafka Connect Docker 映像

本部分介绍如何为您的主题创建和自定义 Kafka Connect Docker 映像。

  1. 克隆 Pub/Sub Lite 迁移 GitHub 代码库。
  2. 针对您之前创建的 IAM 服务帐号,生成 JSON 账号密钥

    使用 base64 工具对 JSON 密钥进行编码。例如,

    Linux

    base64 -w 0 < my_service_account.json > password.txt
    

    Mac

    base64 < account_key_json > password.txt
    
  3. 在 GitHub 代码库的 Secret 文件中,使用适当的信息更新以下文件,以将代码库关联到您的 Google Cloud 项目、Pub/Sub Lite 和 Kafka。

    .gcp/gmk_sasl_service_account → sensitive
    <service-account-name>@<gcp-project>.iam.gserviceaccount.com
    
    .gcp/gmk_sasl_service_account_key → sensitive
    <base64 encoded sasl service account key>
    
    .gcp/kafka_ssl_truststore_location → sensitive
    <full path of the ssl truststore jks file location>
    
    .gcp/kafka_ssl_truststore_password → sensitive
    <password for the ssl truststore jks>
    
    .gcp/gmk_bootstrap_servers → environment specific
    bootstrap.<google-managed-kafka-cluster-name>.<google-managed-kafka-cluster-region name>.managedkafka.<google-managed-cluster-host-project-name>.cloud.goog:9092
    
    .gcp/kafka_connect_group_id → environment specific
    <Kafka Connect group id (unique per worker group) for the Kafka connect workers in distributed mode>
    
    .gcp/kafka_config_storage_topic → environment specific
    <Kafka topic name used by Kafka Connect for tracking the config>
    
    .gcp/kafka_offset_storage_topic → environment specific
    <Kafka topic name used by Kafka Connect for tracking the offsets>
    
    .gcp/kafka_status_storage_topic → environment specific
    <Kafka topic name used by Kafka Connect for tracking the status>
    
    .gcp/kafka_sink_topic → environment specific
    <target sink Kafka topic name used by Kafka Connect for migrating the data from the Pub/Sub Lite topic>
    
    .gcp/pubsub_lite_gcp_project → environment specific
    <Google Cloud project that hosts the Pub/Sub Lite source subscription to be used for migrating the Pub/Sub Lite topic to sink the Kafka topic>
    
    .gcp/pubsub_lite_gcp_location → environment specific
    <Google Cloud location for the Pub/Sub Lite source subscription tor migrate the Pub/Sub Lite topic to sink Kafka topic>
    
    .gcp/pubsub_lite_subscription → environment specific
    <Pub/Sub Lite source subscription name to be used for migrating the pubsub lite topic to Kafka topic>
    
  4. 通过运行 docker/build-image.sh 文件构建 Docker 映像。

    ./push-image.sh
    
  5. 使用您的 Google Cloud 项目名称更新 docker/push-image.sh 映像。

  6. 运行 docker/push-image.sh 文件,将映像推送到 Artifact Registry。

    ./push-image.sh
    

部署 Kafka Connect 工作负载

本部分介绍如何将 Kafka Connect Docker 映像部署到 Google Kubernetes Engine 集群。

  1. 使用身份验证插件安装和配置 kubectl
  2. 为您的 Google Kubernetes Engine 集群生成 kubeconfig
  3. 创建 Google Kubernetes Engine 服务帐号并向其授予正确的权限,以模拟您的 IAM 帐号。

    $KSA_NAME = KUBERNETES_SERVICE_ACCOUNT_NAME
    $PROJECT_ID = GOOGLE_CLOUD_PROJECT_ID
    $IAM_SA_NAME = IAM_SERVICE_ACCOUNT_NAME
    
    kubectl create serviceaccount $KSA_NAME \
        --namespace=default
    
    gcloud iam service-accounts add-iam-policy-binding \
    $IAM_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com \
        --role roles/iam.workloadIdentityUser \
        --member "serviceAccount:$PROJECT_ID.svc.id.goog[default/$KSA_NAME]"
    
    kubectl annotate serviceaccount $KSA_NAME \
        --namespace default \
    iam.gke.io/gcp-service-account=$IAM_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com
    
  4. 在文本编辑器中打开 K8s.yaml 文件并更新以下值。

    1. <workflow_name> 替换为您的 Kafka Connect 工作流的名称。
    2. <gke_service_account> 替换为 Google Kubernetes Engine 服务帐号名称。
  5. 运行 K8s.yaml 文件。

    kubectl create -f k8s.yaml
    

    这将创建在 Google Kubernetes Engine 集群中运行的 Kafka Connect 工作负载,并启动 Pub/Sub Lite 连接器以将数据从 Pub/Sub Lite 主题移动到 Apache Kafka for BigQuery。

监控作业

作业运行后,您可以通过连接到 Kafka Connect REST 端点对其进行检查。

  1. 在 Google Cloud 控制台中,前往“部署详情”>“工作负载”页面
  2. 点击您的 Kubernetes 部署。

    部署详情页面会打开。

  3. 公开服务下,点击公开,然后添加端口 8083

  4. 启用端口转发。

    您通过设置端口转发获得的默认链接会返回类似于以下内容的输出:

    {"version":"3.4.0","commit":"2e1947d240607d53","kafka_cluster_id":"6H6qWA0dQnuK31hBPqYUDg"}
    

    如果您将 /connectors 附加到链接,系统会列出正在运行的连接器,例如:

    ["PubSubLiteSourceConnector"]
    

    例如,勾选此链接 url:8083/connectors/PubSubLiteSourceConnector/status 会生成任务列表及其状态。

    {"name":"PubSubLiteSourceConnector","connector":{"state":"RUNNING","worker_id":"10.53.0.157:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":1,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":2,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":3,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":4,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":5,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":6,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":7,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":8,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":9,"state":"RUNNING","worker_id":"10.53.0.157:8083"}],"type":"source"}
    

分阶段迁移

将 Pub/Sub Lite 主题迁移到 Kafka 后,您可以迁移订阅者和发布者。为此,请按以下步骤操作。

  1. 迁移订阅者。请更新您的订阅者以使用 Kafka 主题,而不是 Pub/Sub Lite 主题。

    此操作应在受控的开发环境中逐步进行。

    理想情况下,您应该维护两组订阅者,以验证是否从 Kafka 和 Pub/Sub Lite 收到了相同的消息。验证正确行为后,您可以停用 Pub/Sub 精简版订阅者。

  2. 迁移发布商。请更新您的发布者,以直接发布到 Kafka 主题,而不是 Pub/Sub Lite 主题。

    与迁移订阅者类似,此操作应在受控开发环境中逐步完成。如果您不太在意重复数据,可以维护两组发布商以验证行为。验证行为后,请停用 Pub/Sub 精简版发布商。

  3. 迁移所有订阅者和发布者后,通过删除工作负载和集群来停用迁移工具。

  4. 删除原始 Pub/Sub 精简版主题。

后续步骤