Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream) 模板

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub 模版是一种流处理流水线,用于从 MySQL 数据库中读取包含更改数据的 Pub/Sub 消息并将记录写入 BigQuery。Debezium 连接器捕获对 MySQL 数据库的更改,并将更改后的数据发布到 Pub/Sub。然后,该模板读取 Pub/Sub 消息并将其写入 BigQuery。

您可以使用此模板同步 MySQL 数据库和 BigQuery 表。流水线将更改后的数据写入 BigQuery 暂存表,并间歇性地更新复制 MySQL 数据库的 BigQuery 表。

流水线要求

  • Debezium 连接器必须已部署
  • Pub/Sub 消息必须在 Beam 行中序列化。

模板参数

必需参数

  • inputSubscriptions:要从中读取数据的 Pub/Sub 输入订阅列表(以英文逗号分隔),格式为 <SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ...
  • changeLogDataset:用于存储暂存表的 BigQuery 数据集,格式为 <DATASET_NAME>。
  • replicaDataset:用于存储副本表的 BigQuery 数据集的位置,格式为 <DATASET_NAME>。

可选参数

  • inputTopics:要将 CDC 数据推送到的 PubSub 主题的列表(以英文逗号分隔)。
  • updateFrequencySecs:流水线更新复制 MySQL 数据库的 BigQuery 表的时间间隔。
  • useSingleTopic:如果您已将 Debezium 连接器配置为将所有表更新发布到单个主题,请将此值设置为 true。默认值为:false。
  • useStorageWriteApi:如果为 true,则流水线使用 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)。默认值为 false。如需了解详情,请参阅“使用 Storage Write API”(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)。
  • useStorageWriteApiAtLeastOnce:使用 Storage Write API 时,指定写入语义。如需使用“至少一次”语义 (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics),请将此参数设置为 true。如需使用“正好一次”语义,请将参数设置为 false。仅当 useStorageWriteApitrue 时,此参数才适用。默认值为 false
  • numStorageWriteApiStreams:使用 Storage Write API 时,指定写入流的数量。如果 useStorageWriteApitrueuseStorageWriteApiAtLeastOncefalse,则必须设置此参数。默认值为 0。
  • storageWriteApiTriggeringFrequencySec:使用 Storage Write API 时,指定触发频率(以秒为单位)。如果 useStorageWriteApitrueuseStorageWriteApiAtLeastOncefalse,则必须设置此参数。

运行模板

如需运行此模板,请执行以下步骤:

  1. 在本地机器上,克隆 DataflowTemplates 代码库
  2. 切换到 v2/cdc-parent 目录:
  3. 确保已部署 Debezium 连接器
  4. 使用 Maven,运行 Dataflow 模板。
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    替换以下内容:

    • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
    • SUBSCRIPTIONS:以英文逗号分隔的 Pub/Sub 订阅名称列表
    • CHANGELOG_DATASET:用于变更日志数据的 BigQuery 数据集
    • REPLICA_DATASET:用于副本表的 BigQuery 数据集

后续步骤