使用 Debezium 将 Cloud Spanner 中的更改捕获到 BigQuery

本教程演示了如何使用 Debezium 捕获 Cloud Spanner 中的更改。在本教程中,您将使用 Debezium 轮询 Cloud Spanner 数据库以获取所有新数据或已修改的数据。Debezium 是一个开源工具,用于跟踪和捕获数据库变更。它构建于 Apache Kafka 之上。在本教程中,您使用 Kafka Connect 将数据流式传入和传出 Kafka。Kafka Connect 提供了几个内置连接器,可用于将数据流式传入和传出热门数据库。

本教程适用于定义、管理并使用设计模式来确定(和跟踪)关系型数据库中数据更改的数据库管理员和工程师。本文假定您了解 Cloud Spanner、BigQuery 以及核心 Kafka Connect 概念。

背景

捕获变更数据 (CDC) 可让您对来自数据库的每个事件进行流式传输。通常,数据库使用事务日志来存储每个数据库事件的记录。这些日志文件也称为数据库日志、重做日志或预写日志,具体取决于数据库特性。CDC 工具使用此事务日志来读取这些事件。

下面是典型的 CDC 日志:

{
  "before": {
    "ID": 1,
    "Firstname": "john",
    "Lastname": "doe",
    "LastUpdateTime": 1544000706000000
  },
  "after": {
    "ID": 1,
    "Firstname": "johny",
    "Lastname": "doe",
    "LastUpdateTime": 1544000742000000
  },
}

数据库轮询会捕获数据库中的当前值。

下面是典型的数据库轮询:

{
    "ID": 1,
    "Firstname": "johny",
    "Lastname": "doe",
    "LastUpdateTime": 1544000742000000
}

数据库变更日志会按顺序捕获所有数据更改。如果您想要查看对数据库所做更改的完整历史记录,则此列表可能至关重要。相比之下,使用基于轮询的方法意味着您在轮询循环的两次运行之间可能会丢失数据更改。例如,如果在两次轮询之间插入和删除记录,则基于轮询的日志无法捕获该更改。

具体操作过程如下:数据库轮询以固定频率运行,根据时间戳捕获值,并捕获最后轮询之后发生的任何变化:位于行“select * from table where LastUpdateTime > last_poll_timestamp”。

概览

Kafka Connect 中的连接器运行作业。这些作业定义连接器从何处复制数据以及向何处复制数据。如需运行作业,请在 Kafka 连接工作器上安排连接器和任务。这些作业包含复制数据的任务。连接器可以将单个作业拆分为多个任务。这样,连接器允许并行运行作业。并行运行作业可以扩缩连接器的数据复制功能。Kafka 使用 ZooKeeper 来维护其配置数据。Zookeeper 通过监控 Kafka 集群节点、主题、分区和其他数据的状态来实现。

当这些作业运行时,它们会创建多条单独的数据流。Kafka 需要通过一种有效的方法来确定每个到达记录的架构。不能将架构与记录一起发送,因为这会导致序列化开销;架构通常大于记录本身。Schema Registry 管理 Kafka 提供方和使用方的 Avro 架构。使用 Avro 架构,您可以配置兼容性设置以支持架构演变

下图高度概括地介绍了本教程中部署的架构。

您在本教程中部署的架构的高度概括。

以下详细说明了图表中所发生的情况:

  • 如需连接到 Spanner,请配置使用 Cloud Spanner 开源 JDBC 驱动程序的来源连接器。
  • 如需将数据写入 BigQuery,请配置使用 Confluent 所提供开箱即用的连接器的接收器连接器。
  • 如需识别 LastUpdateTime 列中值早于上次检索的时间戳的行,您可以在 Spanner 中使用源连接器查询表。
  • 为了准备数据传输,源连接器会将所有符合条件的记录存储在 Kafka 主题中。
  • 为了存储符合条件的记录,接收器连接器会从 Kafka 主题中提取记录并将其写入 BigQuery。

系统设计和限制

在生产中使用此设置之前,请查看以下系统设计注意事项和限制。

注意事项 备注
性能 此解决方案适用于每秒最多更新插入几千次的增量批量复制。
高可用性和容错性 支持。
此解决方案以分布式模式运行 Kafka Connect。连接器作业分布在整个集群中,以提供高可用性和容错性。如需了解详情,请参阅 Kafka 如何处理故障
动态架构更新 支持。
如需了解详情,请参阅验证架构演变
交易订单预留
行更新
重复信息删除
正好一次传送
不支持。
此解决方案支持仅附加复制。在仅附加的复制操作中,新复制的数据会附加到表的末尾。现有行不会更新;更新将作为新行添加到表的末尾。
删除 不受支持。
此解决方案支持行更新形式的软删除或 Tombstone
对数据架构的影响 要复制的所有表的某一列必须使用 Spanner 的提交时间戳来记录上次更新时间。
对现有应用的影响 修改要复制的表中的记录时,必须设置或更新 Spanner commit 时间戳。如果应用删除行,请将其更改为软删除或 Tombstone。
对接收器的支持 Kafka Connect 可让您灵活地更改数据接收器系统,而无需更改任何流处理代码。如需了解详情,请参阅 Kafka Connect 文档中的支持的连接器

目标

  • 使用测试数据库设置 Cloud Spanner。
  • 使用测试数据集设置 BigQuery。
  • 创建 Google Kubernetes Engine (GKE) 集群。
  • 在 GKE 上部署 Debezium 集群。
  • 配置作业以从 Spanner 数据库捕获数据。
  • 配置作业以将数据写入 BigQuery 表。
  • 在 Cloud Spanner 中插入和更新数据。
  • 验证是否在 Kafka 和 BigQuery 中捕获更改。
  • 在 Cloud Spanner 中更改表架构。
  • 在 BigQuery 中验证架构演变。

费用

本教程使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用量来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

准备工作

  1. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到项目选择器页面

  2. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  3. 在 Cloud Console 中,激活 Cloud Shell。

    激活 Cloud Shell

  4. 查找您的项目 ID 并在 Cloud Shell 中进行设置。请将 YOUR_PROJECT_ID 替换为您的项目 ID。

    gcloud config set project YOUR_PROJECT_ID
    
  5. 为教程中所需的 Cloud 项目、地区、区域、服务帐号名称和其他值创建环境变量:

    export PROJECT=$(gcloud config get-value project)
    export REGION="us-central1"
    export ZONE="us-central1-a"
    export CLUSTER="test-cluster"
    export SPANNER_INSTANCE="test-spanner-instance"
    export SPANNER_DB="test-db"
    export SERVICE_ACCOUNT="test-sa"
    export BQ_DATASET="test_bq"
    export SA_EMAIL=${SERVICE_ACCOUNT}@${PROJECT}.iam.gserviceaccount.com
    
  6. 启用适用于 Compute Engine、GKE、Cloud Spanner、Container Analysis 和 Container Registry 的 API:

    gcloud services enable \
    cloudbuild.googleapis.com \
    compute.googleapis.com \
    container.googleapis.com \
    spanner.googleapis.com \
    bigquery.googleapis.com \
    containeranalysis.googleapis.com \
    containerregistry.googleapis.com
    

设置 Cloud Spanner

  1. 在 Cloud Shell 中,创建一个 Cloud Spanner 实例:

    gcloud spanner instances create $SPANNER_INSTANCE \
        --config=regional-$REGION \
        --description="test" \
        --nodes=1
    
  2. 创建数据库和测试表:

    gcloud spanner databases create $SPANNER_DB \
        --instance=$SPANNER_INSTANCE \
        --ddl='CREATE TABLE Test (Id INT64 NOT NULL, Firstname STRING(1024), Lastname STRING(1024), LastUpdateTime TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp = true),) PRIMARY KEY(Id)'
    

上述语句使用 TIMESTAMP 列上的 allow_commit_timestamp 选项创建测试表。allow_commit_timestamp 列选项可让您以原子方式存储提交时间戳。当轮询服务运行时,系统会使用提交时间戳来确定数据库变更。

设置 BigQuery

  • 在 Cloud Shell 中,使用 BigQuery 创建测试数据集:

    bq --location=US mk --dataset $PROJECT:$BQ_DATASET
    

BigQuery 是系统从源数据库复制表或表的位置。

创建 GKE 集群

  1. 在 Cloud Shell 中创建一个 GKE 集群:

    gcloud container clusters create $CLUSTER --zone $ZONE
    
  2. 克隆包含本教程 GKE 集群配置文件以及 Kafka 源和接收器配置文件的代码库:

    git clone https://github.com/GoogleCloudPlatform/spanner-debezium-change-capture
    cd spanner-debezium-change-capture
    

生成身份验证令牌

  1. 在 Cloud Shell 中,创建一个 Google Cloud 服务帐号:

    gcloud iam service-accounts create $SERVICE_ACCOUNT \
      --display-name $SERVICE_ACCOUNT
    

    您稍后将为此帐号生成身份验证令牌。

  2. 向服务帐号授予 Spanner Database Reader 角色:

    gcloud projects add-iam-policy-binding $PROJECT \
        --member serviceAccount:$SA_EMAIL \
        --role roles/spanner.databaseReader
    
  3. 向服务帐号授予 BigQuery Data Owner 角色:

    gcloud projects add-iam-policy-binding $PROJECT \
        --member serviceAccount:$SA_EMAIL \
        --role roles/bigquery.dataOwner
    
  4. 生成身份验证凭据 JSON 文件:

    gcloud iam service-accounts keys create credentials.json \
        --iam-account=$SA_EMAIL
    

在 GKE 上部署 Debezium

  1. 在 Cloud Shell 中,将 [PROJECT_ID] 占位符替换为 source.jsonsink.json 文件中的 Cloud 项目 ID:

    sed -i -e "s/\[PROJECT_ID\]/$PROJECT/g" source.json
    sed -i -e "s/\[PROJECT_ID\]/$PROJECT/g" sink.json
    
  2. credentials.json 文件创建 Kubernetes Secret:

    gcloud container clusters get-credentials test-cluster \
        --zone "${ZONE}" \
        --project "${PROJECT}" && \
    kubectl create secret generic service-account-creds \
        --from-file credentials.json
    
  3. 如需为 Spanner 设置轮询作业,请从 source.json 文件创建 configmap

    kubectl create configmap connector-source --from-file source.json
    
  4. 如需为 BigQuery 设置写入连接器,请从 sink.json 文件创建 configmap

    kubectl create configmap connector-sink --from-file sink.json
    
  5. 在 GKE 集群上部署 Debezium 服务:

    kubectl apply -f kube/
    
  6. 验证是否已部署所有 Debezium 服务:

    kubectl get pods -w
    

    所有服务均需要一些时间才能启动。等到所有 Pod 都开始运行,然后再继续执行下一步。

    最终输出内容类似如下:

    NAME                         READY   STATUS    RESTARTS   AGE
    connect-5999cdb4f6-zcfw7     1/1     Running   0          1m
    kafka-0                      1/1     Running   2          1m
    schema-registry-0            1/1     Running   0          1m
    zookeeper-0                  1/1     Running   0          1m
    

    如需退出等待循环,请按 Control+C(在 Mac 上,按 Command+C)。

Kafka、ZooKeeper 和 Schema-registry 作为 StatefulSet 部署在 GKE 中。

在本教程中,您将创建一个系统来轮询单个 Spanner 表。在实际使用过程中,如果有多个轮询表,则设置并发工作流可缩短提取时间。如需了解详情,请参阅 Kafka Connect 使用入门

设置从 Cloud Spanner 读取的操作

  1. 在 Cloud Shell 中,获取 Kafka Connect 的 Pod 名称:

    POD_NAME=$(kubectl get pods \
        --selector=service=connect -o jsonpath='{.items..metadata.name}')
    
  2. 获取 Kafka Connect 服务的 IP 地址:

    POD_IP=$(kubectl get pods \
        --selector=service=connect -o jsonpath='{.items..podIP}')
    
  3. 配置轮询作业:

    kubectl exec -ti $POD_NAME -- curl -X POST \
        http://$POD_IP:8083/connectors \
        -H "Content-Type: application/json" \
        -d @/config/source.json
    

    连接器 spanner-connector 配置为每秒轮询 Cloud Spanner 中的 Test 表。它使用 LastUpdateTime 列以增量方式提取数据。如需详细了解如何设置连接器,请参阅配置参数

  4. 验证配置:

    kubectl exec -ti $POD_NAME -- curl http://$POD_IP:8083/connectors/spanner-connector
    

    输出内容类似如下:

    {"name":"spanner-connector","config":{..."mode":"timestamp","timestamp.column.name":"LastUpdateTime","table.whitelist":"Test"...},"type":"source"...}
    
  5. 验证 Kafka 连接器是否正在运行:

    kubectl exec -ti $POD_NAME -- curl http://$POD_IP:8083/connectors/spanner-connector/status
    

    输出内容类似如下:

    {"name":"spanner-connector","connector":{"state":"RUNNING","worker_id":"10.8.1.6:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.8.1.6:8083"}],"type":"source"}
    

设置写入 BigQuery 的操作

  1. 在 Cloud Shell 中,配置 BigQuery 连接器:

    kubectl exec -ti $POD_NAME -- curl -X POST \
        http://$POD_IP:8083/connectors \
        -H "Content-Type: application/json" \
        -d @/config/sink.json
    

    连接器 bq-connector 已配置为在数据集 test_bq 中自动创建 spanner_Test 表。Kafka 主题 spanner_Test 中的所有数据会写入 BigQuery 中的 spanner_Test 表。

  2. 验证配置:

    kubectl exec -ti $POD_NAME -- curl http://$POD_IP:8083/connectors/bq-connector
    

    输出内容类似如下:

    {"name":"bq-connector","config":{..."autoCreateTables":"true","topics":"spanner_Test","project":"<Project-Id>","datasets":".*=test_bq"...},"type":"sink"...}
    
  3. 验证连接器是否正在运行:

    kubectl exec -ti $POD_NAME -- curl http://$POD_IP:8083/connectors/bq-connector/status
    

    输出内容类似如下:

    {"name":"bq-connector","connector":{"state":"RUNNING","worker_id":"10.8.1.6:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.8.1.6:8083"}],"type":"sink"}
    

在 Cloud Spanner 中插入和更新数据

在配置从 Cloud Spanner 读取的操作以及向 BigQuery 写入的操作后,请在 Cloud Spanner 中插入和更新记录。

  1. 在 Cloud Shell 中,向表中插入一行:

    gcloud spanner databases execute-sql $SPANNER_DB \
      --instance=$SPANNER_INSTANCE \
      --sql="INSERT Test (Id, Firstname, Lastname, LastUpdateTime) VALUES (1, 'john', 'doe', PENDING_COMMIT_TIMESTAMP())"
    
  2. 更新行:

    gcloud spanner databases execute-sql $SPANNER_DB \
      --instance=$SPANNER_INSTANCE \
      --sql="UPDATE Test SET Firstname = 'johny', LastUpdateTime = PENDING_COMMIT_TIMESTAMP() where Id = 1"
    

验证在 BigQuery 中捕获的更改

在本部分中,您将验证是否已捕获到 spanner_Test Kafka 主题中所做的更改并写入 BigQuery 中的 spanner_Test 表。

  1. 在 Cloud Shell 中,验证是否在 Kafka 主题中捕获了数据:

    kubectl exec -ti schema-registry-0 \
        -- kafka-avro-console-consumer \
        --bootstrap-server kafka-0.kafka.default.svc.cluster.local:9092 \
        --topic spanner_Test --from-beginning \
        --property schema.registry.url=http://schema-registry-0.schema-registry.default.svc.cluster.local:8081
    

    输出内容类似如下:

    {"Id":{"long":1},"Firstname":{"string":"john"},"Lastname":{"string":"doe"},"LastUpdateTime":{"long":1585758520535}}
    {"Id":{"long":1},"Firstname":{"string":"johny"},"Lastname":{"string":"doe"},"LastUpdateTime":{"long":1585758545824}}
    

    如需退出,请按 Control+C(在 Mac 上,按 Command+C)。

  2. 验证 BigQuery spanner_Test 表的架构是否正确:

    bq show --format=prettyjson $PROJECT:$BQ_DATASET.spanner_Test | jq '.schema.fields'
    

    输出内容类似如下:

    [
      {
      "mode": "NULLABLE",
      "name": "Id",
      "type": "INTEGER"
      },
    {
      "mode": "NULLABLE",
      "name": "Firstname",
      "type": "STRING"
    },
      {
      "mode": "NULLABLE",
      "name": "Lastname",
      "type": "STRING"
    },
      {
      "mode": "NULLABLE",
      "name": "LastUpdateTime",
      "type": "TIMESTAMP"
      }
    ]
    
  3. 验证数据是否已写入 spanner_Test 表:

    bq --location=US query \
        --use_legacy_sql=false 'select * from test_bq.spanner_Test'
    

    输出内容类似如下:

    +----+-----------+----------+---------------------+
    | Id | Firstname | Lastname |   LastUpdateTime    |
    +----+-----------+----------+---------------------+
    |  1 | john      | doe      | 2020-04-01 16:28:40 |
    |  1 | johny     | doe      | 2020-04-01 16:29:05 |
    +----+-----------+----------+---------------------+
    

验证架构演变

在本部分中,您将更改 Cloud Spanner 中的表架构,并验证架构更改是否已正确传播到 BigQuery 表。

  1. 在 Cloud Shell 中,向 Spanner 表中添加一个 SoftDelete 列以捕获该行的软删除:

    gcloud spanner databases ddl update $SPANNER_DB \
        --instance=$SPANNER_INSTANCE \
        --ddl='ALTER TABLE Test ADD COLUMN SoftDelete BOOL'
    
  2. 删除表中的行:

    gcloud spanner databases execute-sql $SPANNER_DB \
        --instance=$SPANNER_INSTANCE \
        --sql="UPDATE Test SET SoftDelete = TRUE, LastUpdateTime = PENDING_COMMIT_TIMESTAMP() where Id = 1"
    

    您不会发出硬删除,而是发出一个 update 命令以将 SoftDelete 列设置为 True

  3. 验证架构更改是否已在 BigQuery 表中传播:

    bq show --format=prettyjson $PROJECT:$BQ_DATASET.spanner_Test | jq '.schema.fields'
    

    输出内容类似如下:

    [
    ...
      {
      "mode": "NULLABLE",
      "name": "SoftDelete",
      "type": "BOOLEAN"
      }
    ]
    
  4. 验证表中是否有软删除:

    bq  --location=US query \
        --use_legacy_sql=false 'select * from test_bq.spanner_Test'
    

    输出内容类似如下:

    +----+-----------+----------+---------------------+------------+
    | Id | Firstname | Lastname |   LastUpdateTime    | SoftDelete |
    +----+-----------+----------+---------------------+------------+
    |  1 | john      | doe      | 2020-04-01 16:28:40 |       NULL |
    |  1 | johny     | doe      | 2020-04-01 16:29:05 |       NULL |
    |  1 | johny     | doe      | 2020-04-01 16:31:04 |       true |
    +----+-----------+----------+---------------------+------------+
    

您已成功设置 Debezium,以捕获 Cloud Spanner 中的所有更改并将其发送到 BigQuery。

对 Kafka Connect 设置进行问题排查

如果您在本教程中遇到错误,请使用以下建议来调试您的 Kafka Connect 设置:

  • Kafka 将所有值存储在主题中。如需查看这些主题,请运行以下命令:

    kubectl exec -ti kafka-0 -- bin/kafka-topics.sh \
        --list --zookeeper zookeeper-0.zookeeper.default.svc.cluster.local:2181
    
  • 连接器可以配置为生成多个任务。例如,如果数据库中有多个表,则可以给每个表分配一个任务来轮询数据。您可以通过运行以下命令来查看为连接器配置的任务:

    kubectl exec -ti $POD_NAME -- curl http://$POD_IP:8083/connectors/spanner-connector/tasks
    

    输出内容类似如下:

    [{"id":{"connector":"spanner-connector","task":0},"config":{...}}]
    
  • 配置作业时,Kafka Connect 会自动创建内部主题。这些主题用于存储作业上次在来源系统中进行读取的相关位置信息。如需提取最新偏移量,请运行以下命令:

    kubectl exec -ti kafka-0 -- bin/kafka-console-consumer.sh
        --bootstrap-server kafka-0.kafka.default.svc.cluster.local:9092
        --from-beginning --topic my_connect_offsets
    

    输出内容类似如下:

    {"timestamp_nanos":923115000,"timestamp":1584535152923}
    

当连接器任务重启时,可以从上一个偏移位置继续进行处理。如需手动更改偏移量,请按照此处的步骤操作。如果您不想从一开始就流式传输更改,则更改偏移量非常有用。

替代方法

本教程介绍了设置从 Spanner 到目标数据库的增量复制的方法之一。根据您的要求,需要考虑其他替代方案。这些要求可能包括近乎实时的复制、正好一次数据传送、交易订单预留等。下面简要列出了非排他替代方案:

清理

为避免因本教程中使用的资源导致您的 Google Cloud 帐号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

若要避免产生费用,最简单的方法是删除您为本教程创建的 Cloud 项目。或者,您也可以删除各个资源。

删除项目

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除资源

若要保留本教程中使用的 Google Cloud 项目,请删除各个资源:

  1. 在 Cloud Shell 中,删除 GKE 集群:

    gcloud container clusters delete $CLUSTER --zone $ZONE --async
    
  2. 删除下载的代码、工件、下载的服务帐号私钥和其他依赖项:

    cd .. && rm -rf spanner-debezium-change-capture
    
  3. 删除 Container Registry 中的映像:

    gcloud container images list-tags \
    gcr.io/$PROJECT/connect \
        --format 'value(digest)' | \
    xargs -I {} gcloud container images delete \
        --force-delete-tags --quiet \
    gcr.io/${PROJECT}/connect@sha256:{}
    
  4. 删除服务帐号:

    gcloud iam service-accounts delete $SA_EMAIL
    
  5. 删除 Spanner 实例:

    gcloud spanner instances delete $SPANNER_INSTANCE`
    
  6. 删除 BigQuery 数据集:

    bq rm -r -d $BQ_DATASET
    

后续步骤