Cloud Spanner から BigQuery への大規模なレプリケーション
Google Cloud Japan Team
※この投稿は米国時間 2021 年 8 月 28 日に、Google Cloud blog に投稿されたものの抄訳です。
Cloud Spanner は、GCP のネイティブで分散型のオンライン トランザクション処理システム(OLTP)です。分散型なので水平方向にスケールでき、大量のデータを含む非常に高いスループットのワークロードに適しています。このため、そのワークロードに基づいて幅広く分析することも可能です。Cloud Spanner BigQuery フェデレーションにより、Cloud Spanner データを BigQuery にフェッチして分析しやすくなりました。
本投稿では、この機能を効率的に使用し、大きなテーブルを高スループット(1 秒あたりの挿入件数や更新件数が多め)で複製し、レプリケーション ラグを低~中程度に抑える方法をご紹介します。
ELT プロセスの最適化
Cloud Spanner から BigQuery にデータをフェッチするための効率的な ELT(抽出、読み込み、変換)の設定は、レプリケーション ラグを低く抑えるうえで重要です。最初の完全読み込みを実施した後、増分読み込みを設定する必要があります。Cloud Spanner の大きなテーブルの場合、完全なデータを毎回更新すると、時間とコストがかかる可能性があります。したがって、新しい変更のみを抽出し、BigQuery の既存データとマージした方が効率的です。
増分データ用に Cloud Spanner を設計する
以下のようなテーブル スキーマの例を見てみましょう。
増分変更を識別するには、commit タイムスタンプ列(lastUpdateTime など)を追加する必要があります。また、アプリケーションで PENDING_COMMIT_TIMESTAMP() を渡してください。これにより、Cloud Spanner でコミット後に対応するフィールドが更新されます。
特定のタイムスタンプの後に変更された行を効率的に読み取るには、インデックスを作成する必要があります。単調増加する値のインデックスはホットスポットを引き起こす可能性があるため、別の列(ShardId など)を追加し、(ShardId、LastUpdatedTime)を使用して複合インデックスを作成してください。
更新されたスキーマは以下のようになります。
上記の例では、LastUpdateTime を commit タイムスタンプ列として追加しました。また、-18~+18 の範囲の値が生成される列として ShardId も追加しました。これにより、(ShardId、LastUpdateTime)に複合インデックスを作成することで、タイムスタンプにインデックスを作成する際にホットスポットを回避しやすくなります。
さらに、NULL FILTERED インデックスにして、軽量に保つことができます。古いレコードは LastUpdateTime を null として定期的に更新できます。詳細なソリューションについては、こちらをご覧ください。
テーブルから増分変更をクエリするには、以下のような SQL クエリを実行します。
上記の SQL クエリは LastUpdateTime のフィルタだけでなく、すべてのシャードからデータを読み取ります。このため、インデックスが使用され、大きなテーブルからの読み取り速度が最適化されます。
BigQuery へのデータの初期読み込み
Cloud Spanner で初めてデータを読み込むと、テーブル全体が読み取られ、結果が BigQuery に送信される可能性があります。このため、接続の作成には [データを同時に読み込む] オプションを使用する必要があります。
以下は、初期読み込みを実行するための SQL クエリの例です。
BigQuery にデータを段階的に読み込む
[データを同時に読み込む] のチェックボックスをオフにして、接続を更新(または新しい接続を作成)します。
これは、インデックスを使用する Spanner クエリはルート パーティション化できず、結果を同時に読み取ることができないためです(執筆時点)。これは今後変更される可能性があります。
Cloud Spanner から増分データを取得した後、BigQuery のステージング テーブルに保存して、ELT のうち抽出と読み込みの部分を完了する必要があります。最後に、MERGE ステートメントを作成して増分データを BigQuery テーブルに統合する必要があります。
BigQuery のスクリプトにより、こうした ELT はすべて以下のように 1 つのスクリプトに統合でき、さらにスケジュールされたクエリとして構成できます。
上記のスクリプトは、BigQuery のそのテーブルが最後に更新された時刻を検出します。また、SQL クエリが作成され、前回のフェッチ後に増分データがフェッチされて、ステージング テーブルとして保存されます。次に、新しいデータが BigQuery テーブルにマージされ、最後にステージング テーブルが削除されます。
テーブルを明示的に削除すると、上記のスクリプトの 2 つの同時実行が失敗します。これは、負荷が急上昇してもデータが失われないようにするために重要です。
その他の考慮事項
BigQuery でテーブル パーティションを作成する
通常、テーブル パーティションとクラスタリングは、読み取りや分析の要件に基づいて作成します。ただし、これによりマージのパフォーマンスが低下する可能性があります。このような場合は、BigQuery のパーティショニングとクラスタリングを活用する必要があります。
クラスタリングで一致パフォーマンスが向上するため、テーブルの PK にクラスタリングを追加できます。データをマージするとパーティション全体が書き換えられますが、パーティション フィルタを使用すれば、書き換えられるデータの量を制限できます。
削除された行を処理する
上記のソリューションでは、削除された行がスキップされます。これは、さまざまなユースケースで使用することもできます。ただし、削除された行を追跡するには、アプリケーションで isDeleted = true / false 列を追加するなど、ソフト削除を実装する必要があります。少し後で Cloud Spanner のデータを完全に削除し、まず変更が BigQuery に同期されるようにする必要があります。
BigQuery でのマージ操作時に、上記のフラグに基づいて条件付きで削除できます。
次のステップ
この記事では、Cloud Spanner から BigQuery にデータを複製する方法をご紹介しました。これを実際にテストする場合、ワークロード スキーマの Cloud Spanner でサンプルデータを生成するための手順ガイドとして「Measure Cloud Spanner performance using JMeter(JMeter を使用して Cloud Spanner のパフォーマンスを測定する)」をご覧ください。
-データベース移行エンジニア Shashank Agarwal