Datastream と Dataflow を使用して BigQuery にデータをストリーミングする

このページでは、Datastream と Dataflow を使用して BigQuery にデータをストリーミングする際のベスト プラクティスについて説明します。

ユーザー定義のキーでレプリカ データセットを分割する

BigQuery のステージング データセットは自動的にパーティショニングされます。ただし、デフォルトでは、レプリカ データセットはパーティション分割されません。これは、レプリカ テーブルのパーティション キーが、Datastream と Dataflow によって適用されるのではなく、特定のビジネス ロジックに基づいて定義される必要があるためです。

パーティショニングが必要なレプリカ データセットのテーブルごとに、次の操作を行います。

  1. Dataflow ジョブを停止してドレインします

  2. BigQuery の SQL エディタを使用して、レプリカ データセット内のテーブルごとに次の SQL スクリプトを実行します。この例では、datastream_cdc データセットの actor テーブルに、パーティション キーとして設定する last_update 列があり。スクリプトを実行すると、正しいパーティション キーを使用してテーブルが再作成されます。

    create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' partition by date(last_update)
    as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor'
  3. Datastream to BigQuery テンプレートを使用して、Dataflow ジョブを再作成します。

ユーザー定義関数を実行してイベントデータを操作する

Datastream to BigQuery テンプレートを使用して、JavaScript ユーザー定義関数を実行できます。そのために、まず、関数を含むファイルを Cloud Storage 内の特定の場所に配置します。次に、以下の操作を行います。

  • テンプレートの javascriptTextTransformGcsPath パラメータを使用して、ユーザー定義関数を含む Cloud Storage 内のファイルの場所を指定します。
  • javascriptTextTransformFunctionName パラメータを使用して、ユーザー定義関数として呼び出す JavaScript 関数の名前を指定します。

たとえば、ユーザー定義関数を実行して、削除されたレコードを BigQuery 内のレプリカ データセットのテーブルに保持できます。このプロセスは「ソフト削除」と呼ばれます。

そのためには、_metadata_deleted 列の値を is_deleted という名前の新しい列にコピーし、_metadata_deleted 列の値を false にリセットする関数を作成します。これにより、BigQuery でレプリカ データセットを更新するときに、Dataflow ジョブは削除イベントを無視し、削除されたレコードを保持します。

このユーザー定義関数のサンプルコードは次の通りです。

/**
* This function changes the behavior of the Datastream to
* BigQuery template to allow soft deletes.
* @param {string} messageString from DatastreamIO data
* @return {string} same as an input message with an added property
*/
function transform(messageString) {
   // messageString is a JSON object as a string
   var messageJson = JSON.parse(messageString);
    // Moving the deleted flag to a new column will cause the pipeline to soft delete data.
   messageJson['is_deleted'] = messageJson['_metadata_deleted'];
   messageJson['_metadata_deleted'] = false;
    return JSON.stringify(messageJson);
 }

統合頻度を設定する

Datastream to BigQuery テンプレートmergeFrequencyMinutes パラメータを使用して、統合頻度を設定します。BigQuery のレプリカ データセット内の特定のテーブルの統合間隔(分単位)。過去のデータがバックフィルされている間は、マージの頻度を低く(12~24 時間)抑えて、費用を制御することをおすすめします。

たとえば、このパラメータの値を 10 分に設定すると、Dataflow はテンプレートを使用するジョブを 10 分ごとに実行します。ただし、ジョブが初めて実行されるときは 5 分間遅延します。この例では、ジョブが午前 9 時 14 分に実行される場合、最初のマージは午前 9 時 29 分に行われます(マージが 10 分、遅延が 5 分)。2 回目のマージは午前 9 時 39 分に行われ、その後のマージはすべて 10 分間隔(午前 9 時 49 分、午前 9 時 59 分、午前 10 時 99 分など)に行われます。

マージの頻度を 60 分に設定した場合、ジョブは 1 時間後(初回実行時は 5 分の遅延の後)に実行されます。ジョブが午前 10 時に実行されるようにスケジュールされている場合、5 分間の遅延があるため、実際には午前 10 時 5 分に実行されます。その後のすべてのマージは、60 分間隔(11:05 AM、12:05 PM、1:05 PM など)で行われます。

費用の制御などの理由で、ビジネスニーズを満たす頻度でマージを実行できない場合があります。最新のデータが得られない可能性があります。最新のデータにアクセスするには、BigQuery のステージング データセットとレプリカ データセットのテーブルの上にビューを作成し、そのビューでマージを模倣します。このビューは、(ステージング データセットとレプリカ データセットの両方に対して)1 つの論理テーブルとして作成されます。統合頻度が低く、データへの高速アクセスが必要な場合は、ビューを使用します。