从 Pub/Sub Lite 迁移到 Apache Kafka for BigQuery

本文档详细介绍了如何将数据从 Pub/Sub Lite 主题迁移到 适用于 BigQuery 的 Apache Kafka 主题。

准备工作

  1. 启用 Google Kubernetes Engine API。
  2. 确定要迁移的 Pub/Sub 精简版主题。确定用于以下用途的名称: 适用于 BigQuery 的 Apache Kafka 中的目标主题。此外,还要确定要迁移到哪个适用于 BigQuery 的 Apache Kafka 集群

迁移工作流

如需迁移数据,请完成以下任务。关于“广告资源”委托类型 此页面后面部分会显示任务示例

  1. 创建 Google Kubernetes Engine 服务账号
  2. 创建 Google Kubernetes Engine 集群
  3. 使用配置自定义 Docker 映像 主题的详细信息。
  4. 将 Docker 映像部署到 Google Kubernetes Engine 集群。

    映像内部包含 Kafka Connect 和适用于 Kafka 的 Pub/Sub Lite 插件 Connect 用于订阅新的 Pub/Sub Lite 订阅, 发布到 Apache Kafka for BigQuery

创建 Google Kubernetes Engine 服务账号

本部分介绍如何创建 运行 Google Kubernetes Engine 集群所需的权限。

  1. Google Cloud 控制台中,创建一个新的 IAM 服务账号,并使其具有运行 Google Kubernetes Engine 所需的最低权限。

  2. 向服务账号授予以下其他 IAM 角色。这些 可促进迁移过程。

    • 代管式 Kafka 客户端角色 (roles/managedkafka.client)
    • Pub/Sub 精简版订阅者角色 (roles/pubsublite.subscriber)
    • Pub/Sub Lite Viewer 角色 (roles/pubsublite.Viewer)
    • Artifact Registry Reader 角色 (roles/artifactregistry.reader)

创建 GKE 集群

本部分介绍如何创建使用该服务的 GKE 集群 您在上一步中创建的账号。

  1. 转到 Google Cloud 控制台中的 Google Kubernetes Engine 页面。

    转到 Google Kubernetes Engine

  2. 点击 创建

    系统随即会显示创建 Autopilot 集群页面。

  3. 高级设置标签页下,将服务账号更改为 IAM 服务账号

  4. (可选)配置其他设置

  5. 如需创建集群,请点击创建

创建 Kafka Connect Docker 映像

本部分介绍如何创建和自定义 Kafka Connect Docker 映像 。

  1. 克隆 Pub/Sub Lite 迁移 GitHub 代码库。
  2. 针对您之前创建的 IAM 服务账号,生成 JSON 账号密钥

    使用 base64 工具对 JSON 密钥进行编码。例如,

    Linux

    base64 -w 0 < my_service_account.json > password.txt
    

    Mac

    base64 < account_key_json > password.txt
    
  3. Secret 文件中 GitHub 代码库,使用相应的 将代码库与您的 Google Cloud 项目相关联, Pub/Sub Lite 和 Kafka。

    .gcp/gmk_sasl_service_account → sensitive
    <service-account-name>@<gcp-project>.iam.gserviceaccount.com
    
    .gcp/gmk_sasl_service_account_key → sensitive
    <base64 encoded sasl service account key>
    
    .gcp/kafka_ssl_truststore_location → sensitive
    <full path of the ssl truststore jks file location>
    
    .gcp/kafka_ssl_truststore_password → sensitive
    <password for the ssl truststore jks>
    
    .gcp/gmk_bootstrap_servers → environment specific
    bootstrap.<google-managed-kafka-cluster-name>.<google-managed-kafka-cluster-region name>.managedkafka.<google-managed-cluster-host-project-name>.cloud.goog:9092
    
    .gcp/kafka_connect_group_id → environment specific
    <Kafka Connect group id (unique per worker group) for the Kafka connect workers in distributed mode>
    
    .gcp/kafka_config_storage_topic → environment specific
    <Kafka topic name used by Kafka Connect for tracking the config>
    
    .gcp/kafka_offset_storage_topic → environment specific
    <Kafka topic name used by Kafka Connect for tracking the offsets>
    
    .gcp/kafka_status_storage_topic → environment specific
    <Kafka topic name used by Kafka Connect for tracking the status>
    
    .gcp/kafka_sink_topic → environment specific
    <target sink Kafka topic name used by Kafka Connect for migrating the data from the Pub/Sub Lite topic>
    
    .gcp/pubsub_lite_gcp_project → environment specific
    <Google Cloud project that hosts the Pub/Sub Lite source subscription to be used for migrating the Pub/Sub Lite topic to sink the Kafka topic>
    
    .gcp/pubsub_lite_gcp_location → environment specific
    <Google Cloud location for the Pub/Sub Lite source subscription tor migrate the Pub/Sub Lite topic to sink Kafka topic>
    
    .gcp/pubsub_lite_subscription → environment specific
    <Pub/Sub Lite source subscription name to be used for migrating the pubsub lite topic to Kafka topic>
    
  4. 通过运行 docker/build-image.sh 文件构建 Docker 映像。

    ./push-image.sh
    
  5. 使用您的 Google Cloud 项目名称更新 docker/push-image.sh 映像。

  6. 通过运行 docker/push-image.sh 将映像推送到 Artifact Registry 文件。

    ./push-image.sh
    

部署 Kafka Connect 工作负载

本部分介绍如何将 Kafka Connect Docker 映像部署到 Google Kubernetes Engine 集群。

  1. 安装和配置 kubectl 命令 身份验证插件。
  2. 生成 kubeconfig 适用于 Google Kubernetes Engine 集群
  3. 创建 Google Kubernetes Engine 服务账号并向其授予正确的权限 来模拟您的 IAM 账号。

    $KSA_NAME = KUBERNETES_SERVICE_ACCOUNT_NAME
    $PROJECT_ID = GOOGLE_CLOUD_PROJECT_ID
    $IAM_SA_NAME = IAM_SERVICE_ACCOUNT_NAME
    
    kubectl create serviceaccount $KSA_NAME \
        --namespace=default
    
    gcloud iam service-accounts add-iam-policy-binding \
    $IAM_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com \
        --role roles/iam.workloadIdentityUser \
        --member "serviceAccount:$PROJECT_ID.svc.id.goog[default/$KSA_NAME]"
    
    kubectl annotate serviceaccount $KSA_NAME \
        --namespace default \
    iam.gke.io/gcp-service-account=$IAM_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com
    
  4. 在文本编辑器中打开 K8s.yaml 文件并更新以下值。

    1. <workflow_name> 替换为您的 Kafka Connect 工作流的名称。
    2. <gke_service_account> 替换为 Google Kubernetes Engine 服务账号 名称。
  5. 运行 K8s.yaml 文件。

    kubectl create -f k8s.yaml
    

    这会创建在您的 Google Kubernetes Engine 中运行的 Kafka Connect 工作负载 并启动 Pub/Sub Lite 连接器,将数据从您的 从 Pub/Sub Lite 主题到 Apache Kafka for BigQuery。

监控作业

作业运行后,您可以通过连接到 Kafka Connect 来进行检查 REST 端点。

  1. 在 Google Cloud 控制台中,转到部署详情 >“工作负载”页面
  2. 点击您的 Kubernetes 部署。

    部署详情页面会打开。

  3. 公开服务下,点击公开,然后添加端口 8083

  4. 启用端口转发。

    您在设置端口转发时获得的默认链接会返回 输出类似于以下内容:

    {"version":"3.4.0","commit":"2e1947d240607d53","kafka_cluster_id":"6H6qWA0dQnuK31hBPqYUDg"}
    

    如果您将 /connectors 附加到链接中,系统会列出正在运行的连接器, 例如:

    ["PubSubLiteSourceConnector"]
    

    例如,点击此链接 url:8083/connectors/PubSubLiteSourceConnector/status 会生成任务列表, 及其状态。

    {"name":"PubSubLiteSourceConnector","connector":{"state":"RUNNING","worker_id":"10.53.0.157:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":1,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":2,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":3,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":4,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":5,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":6,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":7,"state":"RUNNING","worker_id":"10.53.0.157:8083"},{"id":8,"state":"RUNNING","worker_id":"10.53.0.139:8083"},{"id":9,"state":"RUNNING","worker_id":"10.53.0.157:8083"}],"type":"source"}
    

分阶段迁移

将 Pub/Sub Lite 主题迁移到 Kafka 后,您可以 迁移订阅者和发布商为此,请按以下说明操作 步骤。

  1. 迁移订阅者。更新订阅者以消费 Kafka 主题而不是 Pub/Sub Lite 主题。

    此操作应在受控的开发环境中逐步进行。

    理想情况下,您应该维护两组订阅者以验证完全相同的消息 都是从 Kafka 和 Pub/Sub Lite 接收的。 确认行为正确无误后 您可以停用自己的 Pub/Sub Lite 位订阅者。

  2. 迁移发布商。请更新您的发布商,以便直接发布到 Kafka 主题而不是 Pub/Sub Lite 主题。

    与迁移订阅者类似,此操作应在 可控的开发环境如果您不希望出现重复数据, 可以维护两组发布商以验证其行为。完成验证后 停用 Pub/Sub Lite 发布者。

  3. 在迁移完所有订阅者和发布商后,停用 来移除工作负载和集群

  4. 删除原始 Pub/Sub 精简版主题。

后续步骤