増分テーブルを構成する

このドキュメントでは、Dataform コアを使用して増分テーブルを構成する方法について説明します。

増分テーブルについて

Dataform は、テーブルの種類に応じてテーブルを更新します。テーブルまたはビューの実行ごとに、Dataform はテーブルまたはビュー全体を最初から再構築します。

増分テーブルを定義すると、Dataform は初回のみ増分テーブルをゼロから作成します。その後の実行時に、Dataform は構成した条件に従って、増分テーブルに新しい行のみを挿入またはマージします。

Dataform は、増分テーブルにすでに存在する列にのみ新しい行を挿入します。新しい列を追加するなどして、増分テーブル定義クエリを変更する場合、テーブルを最初から再構築する必要があります。 これを行うには、次回テーブルの実行をトリガーするときに、[フル更新で実行] オプションを選択します。

増分テーブルの一般的なユースケースを次に示します。

パフォーマンスの最適化
ウェブログや分析データなど、一部のデータについては、テーブル全体を再処理するのではなく、新しいレコードのみを処理することをおすすめします。
レイテンシの低減
増分テーブルを使用すると、ワークフローを迅速かつ頻繁に実行して、出力テーブルのダウンストリーム レイテンシを短縮できます。
毎日のスナップショット
増分テーブルを構成して、テーブルデータの日次スナップショットを作成できます。たとえば、本番環境のデータベースに格納されているユーザー設定の長期的な分析を行う場合などです。

準備

  1. Google Cloud コンソールの [Dataform] ページに移動します。

    [Dataform] ページに移動

  2. リポジトリを作成または選択します。

  3. 開発ワークスペースを作成または選択します。

  4. incremental テーブルタイプのテーブルを作成します。

必要なロール

増分テーブルの構成に必要な権限を取得するには、ワークスペースに対する Dataform 編集者 roles/dataform.editor)IAM ロールの付与を管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

増分テーブル内の行のサブセットを処理する

各実行時に Dataform が処理する行のサブセットを決定するには、増分テーブルの SQLX 定義ファイルに条件付きの WHERE 句を追加します。WHERE 句では、増分条件と非増分条件を指定できます。Dataform は、フル更新なしでテーブルの実行中に増分条件を適用し、フル更新ありで実行中に増分以外の条件を適用します。

増分テーブルを構成する手順は次のとおりです。

  1. 開発ワークスペースに移動します。
  2. [ファイル] ペインで definitions/ を開きます。
  3. 増分テーブル定義の SQLX ファイルを開きます
  4. WHERE 句を次の形式で入力します。

    config { type: "incremental" }
    
    SELECT_STATEMENT
    
    ${when(incremental(), `WHERE INCREMENTAL_CONDITION`, `WHERE NON_INCREMENTAL_CONDITION`) }
    

    以下を置き換えます。

    • SELECT_STATEMENT: テーブルを定義する SELECT ステートメント
    • INCREMENTAL_CONDITION: WHERE 句で指定する条件。完全な更新を行わずにテーブルの実行中に Dataform で処理する行を選択します。

    • NON_INCREMENTAL_CONDITION: テーブルの実行中にフル更新で Dataform で処理する行を選択するために、WHERE 句で指定する条件。

  5. 省略可: [書式] をクリックします。

次のコードサンプルは、productiondb.logs テーブルの行をインクリメントに処理する増分テーブルを示しています。

config { type: "incremental" }

SELECT timestamp, message FROM ${ref("productiondb", "logs")}

${when(incremental(),
   `WHERE date > (SELECT MAX(date) FROM ${self()}) AND country = "UK"`,
   `WHERE country = "UK"`)}

次のコードサンプルは、productiondb.customers テーブルのスナップショットを作成する増分テーブルを示しています。

config { type: "incremental" }

SELECT CURRENT_DATE() AS snapshot_date, customer_id, name, account_settings FROM ${ref("productiondb", "customers")}

${when(incremental(), `WHERE snapshot_date > (SELECT MAX(snapshot_date) FROM ${self()})`) }

増分テーブル内の行を結合する

増分テーブルに選択した列の組み合わせに対応する 1 行のみが含まれるようにするには、選択した列を uniqueKey に設定して、同じ uniqueKey を持つ行をマージします。テーブルを更新するときに、Dataform は行を追加するのではなく、uniqueKey と結合します。

増分テーブルで結合を構成する手順は次のとおりです。

  1. 開発ワークスペースに移動します。
  2. [ファイル] ペインで definitions/ を開きます。
  3. 増分テーブル定義の SQLX ファイルを選択します
  4. config ブロックで、選択した列を次の形式で uniqueKey として設定します。

    uniqueKey: ["COLUMN_NAME"]
    

    COLUMN_NAME は、選択した列の名前に置き換えます。

  5. 省略可: [書式] をクリックします。

次のコードサンプルは、transaction_id 列を uniqueKey として設定した増分テーブルを示し、常に 1 行が含まれるようにします。

config {
  type: "incremental",
  uniqueKey: ["transaction_id"]
}

SELECT timestamp, action FROM weblogs.user_actions
${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

増分テーブル内の行をフィルタする

増分パーティション分割テーブルで、Dataform がテーブル全体をスキャンして一致する行を見つけないようにするには、レコードのサブセットのみを考慮するように updatePartitionFilter を設定します。

次のサンプルコードは、uniqueKey プロパティと updatePartitionFilter プロパティを設定して、マージ処理が設定された増分パーティション分割テーブルを示しています。

config {
  type: "incremental",
  uniqueKey: ["transaction_id"],
  bigquery: {
    partitionBy: "DATE(timestamp)",
    updatePartitionFilter:
        "timestamp >= timestamp_sub(current_timestamp(), interval 24 hour)"
  }
}

SELECT timestamp, action FROM weblogs.user_actions
${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

パーティション分割テーブルから取り込む際に、完全なテーブル スキャンを回避する

パーティション分割テーブルを参照する増分テーブルを作成する場合、各増分更新でパーティション分割テーブルの完全なテーブル スキャンを回避するためにテーブルクエリを作成することをおすすめします。

テーブルクエリで定数式を使用して増分テーブルを更新するために、BigQuery がスキャンするパーティション数を制限できます。パーティション分割テーブルの値を定数式に変換するには、BigQuery スクリプトを使用して pre_operations ブロックで変数として値を宣言します。次に、SELECT クエリの WHERE 句で定数式として変数を使用します。

この構成では、Dataform はテーブル全体をスキャンせずに、参照先パーティション分割テーブルの最新のパーティションに基づいて増分テーブルを更新します。

パーティション分割テーブルを参照し、完全なテーブル スキャンを回避する増分テーブルを構成するには、次の手順を行います。

  1. 開発ワークスペースに移動します。
  2. [ファイル] ペインで definitions/ を開きます。
  3. 増分テーブル定義の SQLX ファイルを選択します
  4. pre_operations ブロックで、BigQuery スクリプトを使用して変数を宣言します
  5. 宣言された変数を参照する WHERE 句を使用して、テーブルを定義する SELECT ステートメントをフィルタします。
  6. 省略可: [書式] をクリックします。

次のサンプルコードは、参照先の raw_events テーブルが event_timestamp によって分割される増分テーブルを示しています。

config {
  type: "incremental",
}

pre_operations {
  DECLARE event_timestamp_checkpoint DEFAULT (
    ${when(incremental(),
    `SELECT max(event_timestamp) FROM ${self()}`,
    `SELECT timestamp("2000-01-01")`)}
  )
}

SELECT
  *
FROM
  ${ref("raw_events")}
WHERE event_timestamp > event_timestamp_checkpoint

上述のコードサンプルでは、event_timestamp_checkpoint 変数が pre_operations ブロックで定義されています。event_timestamp_checkpoint 変数は、WHERE 句の定数式として使用されます。

フル更新で増分テーブルをゼロから再構築する

--full-refresh オプションを使用したコマンドライン インターフェース、またはワークフローの実行をトリガーするときに [フル更新で実行] を使用すると、増分テーブルをゼロから強制的に再構築できます。

フル更新オプションを選択した場合、開発ワークスペース、または Dataform CLI を使用して、Dataform は実行中に ${when(incremental(), ... } パラメータを無視し、CREATE OR REPLACE ステートメントでテーブルを再作成します。

フル更新から増分テーブルを保護する

増分テーブルをゼロから再作成して潜在的なデータ損失から保護するには、増分テーブルを protected として設定します。データソースが一時的な場合は、増分テーブルの再ビルドを防ぐことをおすすめします。

増分テーブルを protected としてマークする手順は次のとおりです。

  1. 開発ワークスペースに移動します。
  2. [ファイル] ペインで [definitions/] を展開します。
  3. 増分テーブル定義の SQLX ファイルを選択します。
  4. config ブロックに「protected: true」と入力します。
  5. 省略可: [書式] をクリックします。

次のコードサンプルは、protected としてマークされた増分テーブルを示しています。

config {
  type: "incremental",
  protected: true
}
SELECT ...

次のステップ