Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub(Stream)テンプレート

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub(Stream)テンプレートは、MySQL データベースの変更データを含む Pub/Sub メッセージを読み取り、レコードを BigQuery に書き込むストリーミング パイプラインです。Debezium コネクタは、MySQL データベースの変更をキャプチャして、変更データを Pub/Sub にパブリッシュします。続いて、テンプレートにより Pub/Sub メッセージが読み取られ、BigQuery に書き込まれます。

このテンプレートを使用すると、MySQL データベースと BigQuery テーブルを同期できます。パイプラインは、変更データを BigQuery のステージング テーブルに書き込み、MySQL データベースを複製する BigQuery テーブルを断続的に更新します。

パイプラインの要件

  • Debezium コネクタがデプロイされる必要があります。
  • Pub/Sub メッセージは Beam Row でシリアル化される必要があります。

テンプレートのパラメータ

必須パラメータ

  • inputSubscriptions: 読み取る Pub/Sub 入力サブスクリプションのカンマ区切りのリスト(形式: <SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ...)。
  • changeLogDataset: ステージング テーブルを格納する BigQuery データセット(形式: <DATASET_NAME>)。
  • replicaDataset: レプリカ テーブルを格納する BigQuery データセットの場所(形式: <DATASET_NAME>)。

オプション パラメータ

  • inputTopics: CDC データが push される宛先の Pub/Sub トピックのカンマ区切りのリスト。
  • updateFrequencySecs: MySQL データベースを複製する BigQuery テーブルをパイプラインが更新する間隔。
  • useSingleTopic: すべてのテーブルの更新を単一のトピックに公開するように Debezium コネクタを構成している場合は、これを true に設定します。デフォルトは false です。
  • useStorageWriteApi: true の場合、パイプラインは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は false です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。
  • useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを true に設定します。exactly-once セマンティクスを使用するには、パラメータを false に設定します。このパラメータは、useStorageWriteApitrue の場合にのみ適用されます。デフォルト値は false です。
  • numStorageWriteApiStreams: Storage Write API を使用する場合は、書き込みストリームの数を指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。デフォルト値は 0 です。
  • storageWriteApiTriggeringFrequencySec: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。

テンプレートを実行する

このテンプレートを実行するには、次の操作を行います。

  1. ローカルマシンで DataflowTemplates リポジトリのクローンを作成します。
  2. v2/cdc-parent ディレクトリに移動します。
  3. Debezium コネクタがデプロイされていることを確認します。
  4. Maven を使用して、Dataflow テンプレートを実行します。
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
    • SUBSCRIPTIONS: Pub/Sub サブスクリプション名のカンマ区切りリスト。
    • CHANGELOG_DATASET: 変更履歴データの BigQuery データセット。
    • REPLICA_DATASET: レプリカ テーブルの BigQuery データセット。

次のステップ