Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub(Stream)テンプレートは、MySQL データベースの変更データを含む Pub/Sub メッセージを読み取り、レコードを BigQuery に書き込むストリーミング パイプラインです。Debezium コネクタは、MySQL データベースの変更をキャプチャして、変更データを Pub/Sub にパブリッシュします。続いて、テンプレートにより Pub/Sub メッセージが読み取られ、BigQuery に書き込まれます。
このテンプレートを使用すると、MySQL データベースと BigQuery テーブルを同期できます。パイプラインは、変更データを BigQuery のステージング テーブルに書き込み、MySQL データベースを複製する BigQuery テーブルを断続的に更新します。
パイプラインの要件
テンプレートのパラメータ
必須パラメータ
- inputSubscriptions: 読み取る Pub/Sub 入力サブスクリプションのカンマ区切りのリスト(形式:
<SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ...
)。 - changeLogDataset: ステージング テーブルを格納する BigQuery データセット(形式: <DATASET_NAME>)。
- replicaDataset: レプリカ テーブルを格納する BigQuery データセットの場所(形式: <DATASET_NAME>)。
オプション パラメータ
- inputTopics: CDC データが push される宛先の Pub/Sub トピックのカンマ区切りのリスト。
- updateFrequencySecs: MySQL データベースを複製する BigQuery テーブルをパイプラインが更新する間隔。
- useSingleTopic: すべてのテーブルの更新を単一のトピックに公開するように Debezium コネクタを構成している場合は、これを true に設定します。デフォルトは false です。
- useStorageWriteApi: true の場合、パイプラインは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は
false
です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。 - useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを
true
に設定します。exactly-once セマンティクスを使用するには、パラメータをfalse
に設定します。このパラメータは、useStorageWriteApi
がtrue
の場合にのみ適用されます。デフォルト値はfalse
です。 - numStorageWriteApiStreams: Storage Write API を使用する場合は、書き込みストリームの数を指定します。
useStorageWriteApi
がtrue
であり、useStorageWriteApiAtLeastOnce
がfalse
の場合に、このパラメータを設定する必要があります。デフォルト値は 0 です。 - storageWriteApiTriggeringFrequencySec: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。
useStorageWriteApi
がtrue
であり、useStorageWriteApiAtLeastOnce
がfalse
の場合に、このパラメータを設定する必要があります。
テンプレートを実行する
このテンプレートを実行するには、次の操作を行います。
- ローカルマシンで DataflowTemplates リポジトリのクローンを作成します。
v2/cdc-parent
ディレクトリに移動します。- Debezium コネクタがデプロイされていることを確認します。
- Maven を使用して、Dataflow テンプレートを実行します。
mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \ --inputSubscriptions=SUBSCRIPTIONS \ --updateFrequencySecs=300 \ --changeLogDataset=CHANGELOG_DATASET \ --replicaDataset=REPLICA_DATASET \ --project=PROJECT_ID \ --region=REGION_NAME"
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクトの IDSUBSCRIPTIONS
: Pub/Sub サブスクリプション名のカンマ区切りリスト。CHANGELOG_DATASET
: 変更履歴データの BigQuery データセット。REPLICA_DATASET
: レプリカ テーブルの BigQuery データセット。
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。