このページでは、Datastream と Dataflow を使用して BigQuery にデータをストリーミングする際のベスト プラクティスについて説明します。
ユーザー定義のキーでレプリカ データセットを分割する
BigQuery のステージング データセットは自動的にパーティショニングされます。ただし、デフォルトでは、レプリカ データセットはパーティション分割されません。これは、レプリカ テーブルのパーティション キーが、Datastream と Dataflow によって適用されるのではなく、特定のビジネス ロジックに基づいて定義される必要があるためです。
パーティショニングが必要なレプリカ データセットのテーブルごとに、次の操作を行います。
BigQuery の SQL エディタを使用して、レプリカ データセット内のテーブルごとに次の SQL スクリプトを実行します。この例では、
datastream_cdc
データセットのactor
テーブルに、パーティション キーとして設定するlast_update
列があり。スクリプトを実行すると、正しいパーティション キーを使用してテーブルが再作成されます。create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new'
partition by date(last_update) as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor' drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor' alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor' Datastream to BigQuery テンプレートを使用して、Dataflow ジョブを再作成します。
ユーザー定義関数を実行してイベントデータを操作する
Datastream to BigQuery テンプレートを使用して、JavaScript ユーザー定義関数を実行できます。そのために、まず、関数を含むファイルを Cloud Storage 内の特定の場所に配置します。次に、以下の操作を行います。
- テンプレートの
javascriptTextTransformGcsPath
パラメータを使用して、ユーザー定義関数を含む Cloud Storage 内のファイルの場所を指定します。 javascriptTextTransformFunctionName
パラメータを使用して、ユーザー定義関数として呼び出す JavaScript 関数の名前を指定します。
たとえば、ユーザー定義関数を実行して、削除されたレコードを BigQuery 内のレプリカ データセットのテーブルに保持できます。このプロセスは「ソフト削除」と呼ばれます。
そのためには、_metadata_deleted
列の値を is_deleted
という名前の新しい列にコピーし、_metadata_deleted
列の値を false
にリセットする関数を作成します。これにより、BigQuery でレプリカ データセットを更新するときに、Dataflow ジョブは削除イベントを無視し、削除されたレコードを保持します。
このユーザー定義関数のサンプルコードは次の通りです。
/** * This function changes the behavior of the Datastream to * BigQuery template to allow soft deletes. * @param {string} messageString from DatastreamIO data * @return {string} same as an input message with an added property */ function transform(messageString) { // messageString is a JSON object as a string var messageJson = JSON.parse(messageString); // Moving the deleted flag to a new column will cause the pipeline to soft delete data. messageJson['is_deleted'] = messageJson['_metadata_deleted']; messageJson['_metadata_deleted'] = false; return JSON.stringify(messageJson); }
統合頻度を設定する
Datastream to BigQuery テンプレートの mergeFrequencyMinutes
パラメータを使用して、統合頻度を設定します。BigQuery のレプリカ データセット内の特定のテーブルの統合間隔(分単位)。過去のデータがバックフィルされている間は、マージの頻度を低く(12~24 時間)抑えて、費用を制御することをおすすめします。
たとえば、このパラメータの値を 10 分に設定すると、Dataflow はテンプレートを使用するジョブを 10 分ごとに実行します。ただし、ジョブが初めて実行されるときは 5 分間遅延します。この例では、ジョブが午前 9 時 14 分に実行される場合、最初のマージは午前 9 時 29 分に行われます(マージが 10 分、遅延が 5 分)。2 回目のマージは午前 9 時 39 分に行われ、その後のマージはすべて 10 分間隔(午前 9 時 49 分、午前 9 時 59 分、午前 10 時 99 分など)に行われます。
マージの頻度を 60 分に設定した場合、ジョブは 1 時間後(初回実行時は 5 分の遅延の後)に実行されます。ジョブが午前 10 時に実行されるようにスケジュールされている場合、5 分間の遅延があるため、実際には午前 10 時 5 分に実行されます。その後のすべてのマージは、60 分間隔(11:05 AM、12:05 PM、1:05 PM など)で行われます。
費用の制御などの理由で、ビジネスニーズを満たす頻度でマージを実行できない場合があります。最新のデータが得られない可能性があります。最新のデータにアクセスするには、BigQuery のステージング データセットとレプリカ データセットのテーブルの上にビューを作成し、そのビューでマージを模倣します。このビューは、(ステージング データセットとレプリカ データセットの両方に対して)1 つの論理テーブルとして作成されます。統合頻度が低く、データへの高速アクセスが必要な場合は、ビューを使用します。