Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub 模版是一种流处理流水线,用于从 MySQL 数据库中读取包含更改数据的 Pub/Sub 消息并将记录写入 BigQuery。Debezium 连接器捕获对 MySQL 数据库的更改,并将更改后的数据发布到 Pub/Sub。然后,该模板读取 Pub/Sub 消息并将其写入 BigQuery。
您可以使用此模板同步 MySQL 数据库和 BigQuery 表。流水线将更改后的数据写入 BigQuery 暂存表,并间歇性地更新复制 MySQL 数据库的 BigQuery 表。
流水线要求
模板参数
必需参数
- inputSubscriptions:要从中读取数据的 Pub/Sub 输入订阅列表(以英文逗号分隔),格式为
<SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ...
。 - changeLogDataset:用于存储暂存表的 BigQuery 数据集,格式为 <DATASET_NAME>。
- replicaDataset:用于存储副本表的 BigQuery 数据集的位置,格式为 <DATASET_NAME>。
可选参数
- inputTopics:要将 CDC 数据推送到的 PubSub 主题的列表(以英文逗号分隔)。
- updateFrequencySecs:流水线更新复制 MySQL 数据库的 BigQuery 表的时间间隔。
- useSingleTopic:如果您已将 Debezium 连接器配置为将所有表更新发布到单个主题,请将此值设置为
true
。默认值为:false。 - useStorageWriteApi:如果为 true,则流水线使用 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)。默认值为
false
。如需了解详情,请参阅“使用 Storage Write API”(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)。 - useStorageWriteApiAtLeastOnce:使用 Storage Write API 时,指定写入语义。如需使用“至少一次”语义 (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics),请将此参数设置为
true
。如需使用“正好一次”语义,请将参数设置为false
。仅当useStorageWriteApi
为true
时,此参数才适用。默认值为false
。 - numStorageWriteApiStreams:使用 Storage Write API 时,指定写入流的数量。如果
useStorageWriteApi
为true
且useStorageWriteApiAtLeastOnce
为false
,则必须设置此参数。默认值为 0。 - storageWriteApiTriggeringFrequencySec:使用 Storage Write API 时,指定触发频率(以秒为单位)。如果
useStorageWriteApi
为true
且useStorageWriteApiAtLeastOnce
为false
,则必须设置此参数。
运行模板
如需运行此模板,请执行以下步骤:
- 在本地机器上,克隆 DataflowTemplates 代码库。
- 切换到
v2/cdc-parent
目录: - 确保已部署 Debezium 连接器。
- 使用 Maven,运行 Dataflow 模板。
mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \ --inputSubscriptions=SUBSCRIPTIONS \ --updateFrequencySecs=300 \ --changeLogDataset=CHANGELOG_DATASET \ --replicaDataset=REPLICA_DATASET \ --project=PROJECT_ID \ --region=REGION_NAME"
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDSUBSCRIPTIONS
:以英文逗号分隔的 Pub/Sub 订阅名称列表CHANGELOG_DATASET
:用于变更日志数据的 BigQuery 数据集REPLICA_DATASET
:用于副本表的 BigQuery 数据集
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。