増分テーブルについて
Dataform は、テーブルの種類に応じてテーブルを更新します。テーブルまたはビューの実行ごとに、Dataform はテーブルまたはビュー全体を最初から再構築します。
増分テーブルを定義すると、Dataform は初回のみ増分テーブルをゼロから作成します。その後の実行時に、Dataform は構成した条件に従って、増分テーブルに新しい行のみを挿入またはマージします。
Dataform は、増分テーブルにすでに存在する列にのみ新しい行を挿入します。新しい列を追加するなどして、増分テーブル定義クエリを変更する場合、テーブルを最初から再構築する必要があります。 これを行うには、次回テーブルの実行をトリガーするときに、[フル更新で実行] オプションを選択します。
増分テーブルの一般的なユースケースを次に示します。
- パフォーマンスの最適化
- ウェブログや分析データなど、一部のデータについては、テーブル全体を再処理するのではなく、新しいレコードのみを処理することをおすすめします。
- レイテンシの低減
- 増分テーブルを使用すると、ワークフローを迅速かつ頻繁に実行して、出力テーブルのダウンストリーム レイテンシを短縮できます。
- 毎日のスナップショット
- 増分テーブルを構成して、テーブルデータの日次スナップショットを作成できます。たとえば、本番環境のデータベースに格納されているユーザー設定の長期的な分析を行う場合などです。
準備
Google Cloud コンソールの [Dataform] ページに移動します。
リポジトリを作成または選択します。
開発ワークスペースを作成または選択します。
incremental
テーブルタイプのテーブルを作成します。
必要なロール
増分テーブルの構成に必要な権限を取得するには、ワークスペースに対する Dataform 編集者 (roles/dataform.editor
)IAM ロールの付与を管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。
必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。
増分テーブル内の行のサブセットを処理する
各実行時に Dataform が処理する行のサブセットを決定するには、増分テーブルの SQLX 定義ファイルに条件付きの WHERE
句を追加します。WHERE
句では、増分条件と非増分条件を指定できます。Dataform は、フル更新なしでテーブルの実行中に増分条件を適用し、フル更新ありで実行中に増分以外の条件を適用します。
増分テーブルを構成する手順は次のとおりです。
- 開発ワークスペースに移動します。
- [ファイル] ペインで
definitions/
を開きます。 - 増分テーブル定義の SQLX ファイルを開きます
WHERE
句を次の形式で入力します。config { type: "incremental" } SELECT_STATEMENT ${when(incremental(), `WHERE INCREMENTAL_CONDITION`, `WHERE NON_INCREMENTAL_CONDITION`) }
以下を置き換えます。
- SELECT_STATEMENT: テーブルを定義する
SELECT
ステートメント INCREMENTAL_CONDITION:
WHERE
句で指定する条件。完全な更新を行わずにテーブルの実行中に Dataform で処理する行を選択します。NON_INCREMENTAL_CONDITION: テーブルの実行中にフル更新で Dataform で処理する行を選択するために、
WHERE
句で指定する条件。
- SELECT_STATEMENT: テーブルを定義する
省略可: [書式] をクリックします。
次のコードサンプルは、productiondb.logs
テーブルの行をインクリメントに処理する増分テーブルを示しています。
config { type: "incremental" }
SELECT timestamp, message FROM ${ref("productiondb", "logs")}
${when(incremental(),
`WHERE date > (SELECT MAX(date) FROM ${self()}) AND country = "UK"`,
`WHERE country = "UK"`)}
次のコードサンプルは、productiondb.customers
テーブルのスナップショットを作成する増分テーブルを示しています。
config { type: "incremental" }
SELECT CURRENT_DATE() AS snapshot_date, customer_id, name, account_settings FROM ${ref("productiondb", "customers")}
${when(incremental(), `WHERE snapshot_date > (SELECT MAX(snapshot_date) FROM ${self()})`) }
増分テーブル内の行を結合する
増分テーブルに選択した列の組み合わせに対応する 1 行のみが含まれるようにするには、選択した列を uniqueKey
に設定して、同じ uniqueKey
を持つ行をマージします。テーブルを更新するときに、Dataform は行を追加するのではなく、uniqueKey
と結合します。
増分テーブルで結合を構成する手順は次のとおりです。
- 開発ワークスペースに移動します。
- [ファイル] ペインで
definitions/
を開きます。 - 増分テーブル定義の SQLX ファイルを選択します
config
ブロックで、選択した列を次の形式でuniqueKey
として設定します。uniqueKey: ["COLUMN_NAME"]
COLUMN_NAME は、選択した列の名前に置き換えます。
省略可: [書式] をクリックします。
次のコードサンプルは、transaction_id
列を uniqueKey
として設定した増分テーブルを示し、常に 1 行が含まれるようにします。
config {
type: "incremental",
uniqueKey: ["transaction_id"]
}
SELECT timestamp, action FROM weblogs.user_actions
${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }
増分テーブル内の行をフィルタする
増分パーティション分割テーブルで、Dataform がテーブル全体をスキャンして一致する行を見つけないようにするには、レコードのサブセットのみを考慮するように updatePartitionFilter
を設定します。
次のサンプルコードは、uniqueKey
プロパティと updatePartitionFilter
プロパティを設定して、マージ処理が設定された増分パーティション分割テーブルを示しています。
config {
type: "incremental",
uniqueKey: ["transaction_id"],
bigquery: {
partitionBy: "DATE(timestamp)",
updatePartitionFilter:
"timestamp >= timestamp_sub(current_timestamp(), interval 24 hour)"
}
}
SELECT timestamp, action FROM weblogs.user_actions
${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }
パーティション分割テーブルから取り込む際に、完全なテーブル スキャンを回避する
パーティション分割テーブルを参照する増分テーブルを作成する場合、各増分更新でパーティション分割テーブルの完全なテーブル スキャンを回避するためにテーブルクエリを作成することをおすすめします。
テーブルクエリで定数式を使用して増分テーブルを更新するために、BigQuery がスキャンするパーティション数を制限できます。パーティション分割テーブルの値を定数式に変換するには、BigQuery スクリプトを使用して pre_operations
ブロックで変数として値を宣言します。次に、SELECT
クエリの WHERE
句で定数式として変数を使用します。
この構成では、Dataform はテーブル全体をスキャンせずに、参照先パーティション分割テーブルの最新のパーティションに基づいて増分テーブルを更新します。
パーティション分割テーブルを参照し、完全なテーブル スキャンを回避する増分テーブルを構成するには、次の手順を行います。
- 開発ワークスペースに移動します。
- [ファイル] ペインで
definitions/
を開きます。 - 増分テーブル定義の SQLX ファイルを選択します
pre_operations
ブロックで、BigQuery スクリプトを使用して変数を宣言します。- 宣言された変数を参照する
WHERE
句を使用して、テーブルを定義するSELECT
ステートメントをフィルタします。 - 省略可: [書式] をクリックします。
次のサンプルコードは、参照先の raw_events
テーブルが event_timestamp
によって分割される増分テーブルを示しています。
config {
type: "incremental",
}
pre_operations {
DECLARE event_timestamp_checkpoint DEFAULT (
${when(incremental(),
`SELECT max(event_timestamp) FROM ${self()}`,
`SELECT timestamp("2000-01-01")`)}
)
}
SELECT
*
FROM
${ref("raw_events")}
WHERE event_timestamp > event_timestamp_checkpoint
上述のコードサンプルでは、event_timestamp_checkpoint
変数が pre_operations
ブロックで定義されています。event_timestamp_checkpoint
変数は、WHERE
句の定数式として使用されます。
フル更新で増分テーブルをゼロから再構築する
--full-refresh
オプションを使用したコマンドライン インターフェース、またはワークフローの実行をトリガーするときに [フル更新で実行] を使用すると、増分テーブルをゼロから強制的に再構築できます。
フル更新オプションを選択した場合、開発ワークスペース、または Dataform CLI を使用して、Dataform は実行中に ${when(incremental(), ... }
パラメータを無視し、CREATE OR REPLACE
ステートメントでテーブルを再作成します。
フル更新から増分テーブルを保護する
増分テーブルをゼロから再作成して潜在的なデータ損失から保護するには、増分テーブルを protected
として設定します。データソースが一時的な場合は、増分テーブルの再ビルドを防ぐことをおすすめします。
増分テーブルを protected
としてマークする手順は次のとおりです。
- 開発ワークスペースに移動します。
- [ファイル] ペインで [
definitions/
] を展開します。 - 増分テーブル定義の SQLX ファイルを選択します。
config
ブロックに「protected: true
」と入力します。- 省略可: [書式] をクリックします。
次のコードサンプルは、protected
としてマークされた増分テーブルを示しています。
config {
type: "incremental",
protected: true
}
SELECT ...
次のステップ
- テーブルを定義する方法については、テーブルの作成をご覧ください。
- Dataform コマンドライン インターフェースの使用方法については、Dataform CLI を使用するをご覧ください。
- 実行を手動でトリガーする方法については、実行のトリガーをご覧ください。