配置增量表

本文档介绍了如何使用 Dataform core 来配置增量表。

增量表简介

Dataform 会根据表类型以不同的方式更新表。每次执行表或视图时,Dataform 都会从头开始重新构建整个表或视图。

定义增量表时,Dataform 仅在首次从头开始构建增量表时。在后续执行期间,Dataform 仅根据您配置的条件将新行插入或合并到增量表中。

Dataform 仅将新行插入增量表中已存在的列中。如果对增量表定义查询进行更改(例如,添加新列),则必须从头开始重新构建表。为此,请在下一次触发表的执行时,选择以完全刷新模式运行选项。

以下是增量表的一些常见使用场景:

性能优化
对于某些类型的数据,例如网站日志或分析数据,您可能只想处理新记录,而不是重新处理整个表。
缩短延迟时间
您可以使用增量表快速而频繁地执行工作流,从而缩短输出表的下游延迟时间。
每日快照
您可以配置增量表以创建表数据的每日快照,例如,对存储在生产数据库中的用户设置进行纵向分析。

准备工作

  1. 在 Google Cloud 控制台中,前往 Dataform 页面。

    转到 Dataform 页面

  2. 选择或创建代码库

  3. 选择或创建开发工作区

  4. 创建一个表,类型为 incremental

所需的角色

如需获取配置增量表所需的权限,请让管理员授予您工作区的 Dataform Editor (roles/dataform.editor) IAM 角色。如需详细了解如何授予角色,请参阅管理访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

处理增量表中的行子集

如需确定让 Dataform 在每次执行期间处理的部分行,请向增量表 SQLX 定义文件添加条件 WHERE 子句。在 WHERE 子句中,您可以指定增量条件和非增量条件。Dataform 在表的执行期间应用增量条件(不进行完全刷新),并在执行期间应用非增量条件(在完全刷新的情况下)。

如需配置增量表,请按以下步骤操作:

  1. 进入开发工作区。
  2. Files 窗格中,展开 definitions/
  3. 打开增量表定义 SQLX 文件
  4. 按以下格式输入 WHERE 子句:

    config { type: "incremental" }
    
    SELECT_STATEMENT
    
    ${when(incremental(), `WHERE INCREMENTAL_CONDITION`, `WHERE NON_INCREMENTAL_CONDITION`) }
    

    请替换以下内容:

    • SELECT_STATEMENT:用于定义表的 SELECT 语句
    • INCREMENTAL_CONDITION:您在 WHERE 子句中指定的条件,用于选择 Dataform 在表执行期间处理的行,而不进行完全刷新。

    • NON_INCREMENTAL_CONDITION:您在 WHERE 子句中指定的条件,用于选择 Dataform 在表执行期间通过完全刷新处理的行。

  5. 可选:点击格式

以下代码示例展示了一个以增量方式处理 productiondb.logs 表行的增量表:

config { type: "incremental" }

SELECT timestamp, message FROM ${ref("productiondb", "logs")}

${when(incremental(),
   `WHERE date > (SELECT MAX(date) FROM ${self()}) AND country = "UK"`,
   `WHERE country = "UK"`)}

以下代码示例展示了一个用于创建 productiondb.customers 表快照的增量表:

config { type: "incremental" }

SELECT CURRENT_DATE() AS snapshot_date, customer_id, name, account_settings FROM ${ref("productiondb", "customers")}

${when(incremental(), `WHERE snapshot_date > (SELECT MAX(snapshot_date) FROM ${self()})`) }

合并增量表中的行

如需确保增量表仅包含一行与所选列组合相对应,请将所选列设置为 uniqueKey 以合并具有相同 uniqueKey 的行。更新表时,Dataform 会将行与 uniqueKey 合并,而不是附加。

如需在增量表中配置合并,请按以下步骤操作:

  1. 进入开发工作区。
  2. Files 窗格中,展开 definitions/
  3. 选择增量表定义 SQLX 文件
  4. config 代码块中,按以下格式将所选列设置为 uniqueKey

    uniqueKey: ["COLUMN_NAME"]
    

    COLUMN_NAME 替换为所选列的名称。

  5. 可选:点击格式

以下代码示例展示了一个增量表,其中 transaction_id 列设置为 uniqueKey,以确保该表始终包含一行:

config {
  type: "incremental",
  uniqueKey: ["transaction_id"]
}

SELECT timestamp, action FROM weblogs.user_actions
${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

过滤增量表中的行

在增量分区表中,为避免 Dataform 扫描整个表来查找匹配的行,请将 updatePartitionFilter 设置为仅考虑一部分记录。

以下代码示例展示了通过设置 uniqueKeyupdatePartitionFilter 属性配置了合并的增量分区表:

config {
  type: "incremental",
  uniqueKey: ["transaction_id"],
  bigquery: {
    partitionBy: "DATE(timestamp)",
    updatePartitionFilter:
        "timestamp >= timestamp_sub(current_timestamp(), interval 24 hour)"
  }
}

SELECT timestamp, action FROM weblogs.user_actions
${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

避免在从分区表中提取数据时进行全表扫描

在创建引用分区表的增量表时,我们建议您构建表查询,以避免在每次增量更新期间对分区表进行全表扫描。

您可以在表查询中使用常量表达式,限制 BigQuery 为更新增量表而扫描的分区数。如需将分区表中的值转换为常量表达式,请使用 BigQuery 脚本在 pre_operations 块中将该值声明为变量。然后,在 SELECT 查询的 WHERE 子句中使用该变量作为常量表达式。

使用此配置时,Dataform 会根据引用的分区表的最新分区来更新增量表,而不会扫描整个表。

如需配置引用分区表并避免全表扫描的增量表,请按以下步骤操作:

  1. 进入开发工作区。
  2. Files 窗格中,展开 definitions/
  3. 选择增量表定义 SQLX 文件
  4. pre_operations 代码块中,使用 BigQuery 脚本声明变量
  5. 使用引用所声明变量的 WHERE 子句过滤定义表的 SELECT 语句。
  6. 可选:点击格式

以下代码示例展示了一个增量表,其中引用的 raw_events 表按 event_timestamp 分区:

config {
  type: "incremental",
}

pre_operations {
  DECLARE event_timestamp_checkpoint DEFAULT (
    ${when(incremental(),
    `SELECT max(event_timestamp) FROM ${self()}`,
    `SELECT timestamp("2000-01-01")`)}
  )
}

SELECT
  *
FROM
  ${ref("raw_events")}
WHERE event_timestamp > event_timestamp_checkpoint

在前面的代码示例中,event_timestamp_checkpoint 变量在 pre_operations 块中定义。然后,event_timestamp_checkpoint 变量会用作 WHERE 子句中的常量表达式。

使用完全刷新从头开始重新构建增量表

您可以在触发工作流执行时使用带有 --full-refresh 选项的命令行界面运行完全刷新选项强制从头开始重新构建增量表。

当您在开发工作区中或使用 Dataform CLI 选择完整的刷新选项时,Dataform 会在执行过程中忽略 ${when(incremental(), ... } 参数并使用 CREATE OR REPLACE 语句重新创建表。

保护增量表以防完全刷新

为了防止从头开始重新构建增量表并防止可能丢失数据,可以将增量表设置为 protected。如果数据源是临时的,您可能希望阻止重新构建增量表。

如需将增量表标记为 protected,请按以下步骤操作:

  1. 进入开发工作区。
  2. Files 窗格中,展开 definitions/
  3. 选择增量表定义 SQLX 文件。
  4. config 代码块中,输入 protected: true
  5. 可选:点击格式

以下代码示例展示了标记为 protected 的增量表:

config {
  type: "incremental",
  protected: true
}
SELECT ...

后续步骤