配置增量表

本文档介绍了如何使用 Dataform core:用于配置 增量表。

增量表简介

Dataform 根据表类型以不同的方式更新表。中 每次执行表或视图时,Dataform 都会重新构建整个 创建报表或视图

定义增量表后,Dataform 会构建增量 从零开始创建表格在后续执行过程中 Dataform 仅在增量表中插入或合并新行 您配置的条件。

Dataform 只会将新行插入 增量表。如果更改增量表定义 查询(例如,添加新列),您必须从头开始重新构建表。 为此,下次您学习 触发表的执行 选择使用完全刷新运行选项。

以下是增量表的一些常见使用场景:

性能优化
对于某些类型的数据(例如网站流量记录或分析数据),您可能需要创建 以便仅处理新记录,而不是重新处理整个表。
缩短延迟时间
您可以使用增量表快速但频繁地执行工作流, 缩短输出表的下行延迟时间。
每日快照
您可以通过配置增量表来创建 表数据,例如,对存储的用户设置进行纵向分析 在生产数据库中运行。

准备工作

  1. 在 Google Cloud 控制台中,前往 Dataform 页面。

    转到 Dataform 页面

  2. 选择或创建代码库

  3. 选择或创建开发工作区

  4. 创建一个表来包含 incremental 表类型。

所需的角色

如需获取配置增量表所需的权限, 请让管理员向您授予 Dataform Editor (roles/dataform.editor) 工作区的 IAM 角色。 如需详细了解如何授予角色,请参阅管理访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

处理增量表中的部分行

确定在执行每个操作时 Dataform 要处理的一部分行 请在增量表 SQLX 中添加条件 WHERE 子句 定义文件在 WHERE 子句中, 您可以指定一个增量条件和非增量条件。 Dataform 在表执行期间应用增量条件 非增量刷新条件,以及执行期间的非增量条件 进行全面刷新

如需配置增量表,请按以下步骤操作:

  1. 转到开发工作区。
  2. Files 窗格中,展开 definitions/
  3. 打开增量表定义 SQLX 文件
  4. 按以下格式输入 WHERE 子句:

    config { type: "incremental" }
    
    SELECT_STATEMENT
    
    ${when(incremental(), `WHERE INCREMENTAL_CONDITION`, `WHERE NON_INCREMENTAL_CONDITION`) }
    

    替换以下内容:

    • SELECT_STATEMENTSELECT 语句, 定义您的表
    • INCREMENTAL_CONDITION:您在 WHERE 中指定的条件 子句,用于选择 Dataform 要在表执行期间处理的行 即不进行完全刷新

    • NON_INCREMENTAL_CONDITION:您在 WHERE 子句,用于选择 Dataform 要在表期间处理的行 完整刷新。

  5. 可选:点击格式

以下代码示例展示了一个增量表 处理 productiondb.logs 表中的行:

config { type: "incremental" }

SELECT timestamp, message FROM ${ref("productiondb", "logs")}

${when(incremental(),
   `WHERE date > (SELECT MAX(date) FROM ${self()}) AND country = "UK"`,
   `WHERE country = "UK"`)}

以下代码示例展示了一个用于创建快照的增量表 (位于 productiondb.customers 表中):

config { type: "incremental" }

SELECT CURRENT_DATE() AS snapshot_date, customer_id, name, account_settings FROM ${ref("productiondb", "customers")}

${when(incremental(), `WHERE snapshot_date > (SELECT MAX(snapshot_date) FROM ${self()})`) }

合并增量表中的行

要确保增量表只包含一个与 所选的列组合,请将所选列的 uniqueKey 设置为 合并具有相同 uniqueKey 的行。更新表时 Dataform 会将行与 uniqueKey 合并,而不是附加行。

如需在增量表中配置合并,请按以下步骤操作:

  1. 转到开发工作区。
  2. Files 窗格中,展开 definitions/
  3. 选择增量表定义 SQLX 文件
  4. config 代码块中,将所选列设置为 uniqueKey 格式如下:

    uniqueKey: ["COLUMN_NAME"]
    

    COLUMN_NAME 替换为所选列的名称。

  5. 可选:点击格式

以下代码示例显示了包含 transaction_id 的增量表 列设置为 uniqueKey,以确保它始终包含一行:

config {
  type: "incremental",
  uniqueKey: ["transaction_id"]
}

SELECT timestamp, action FROM weblogs.user_actions
${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

过滤增量表中的行

在增量分区表中,为了避免 Dataform 扫描 以查找匹配的行,将 updatePartitionFilter 设置为仅考虑 记录子集。

以下代码示例显示了包含合并的增量分区表 通过设置 uniqueKeyupdatePartitionFilter 属性进行配置:

config {
  type: "incremental",
  uniqueKey: ["transaction_id"],
  bigquery: {
    partitionBy: "DATE(timestamp)",
    updatePartitionFilter:
        "timestamp >= timestamp_sub(current_timestamp(), interval 24 hour)"
  }
}

SELECT timestamp, action FROM weblogs.user_actions
${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

从分区表中提取数据时避免全表扫描

在创建引用分区表的增量表时, 建议您构建表查询,以避免对表进行全表扫描 对分区表执行的操作。

您可以限制 BigQuery 扫描要更新的分区数量 使用常量表达式来查询增量表, 表查询。将分区表中的值转换为常量 使用 BigQuery 脚本来 在 pre_operations 代码块中将值声明为变量。 然后,在 WHERE 子句中使用该变量作为常量表达式, SELECT 查询。

使用此配置时,Dataform 会根据 针对引用的分区表的最新分区执行 而无需扫描整个表。

配置引用分区表的增量表 并避免全表扫描,请按以下步骤操作:

  1. 转到开发工作区。
  2. Files 窗格中,展开 definitions/
  3. 选择增量表定义 SQLX 文件
  4. pre_operations 代码块中, 使用 BigQuery 脚本声明变量
  5. 使用 WHERE 子句过滤定义表的 SELECT 语句 引用所声明变量的代码
  6. 可选:点击格式

以下代码示例展示了一个增量表,其中引用了 raw_events 表按 event_timestamp 分区:

config {
  type: "incremental",
}

pre_operations {
  DECLARE event_timestamp_checkpoint DEFAULT (
    ${when(incremental(),
    `SELECT max(event_timestamp) FROM ${self()}`,
    `SELECT timestamp("2000-01-01")`)}
  )
}

SELECT
  *
FROM
  ${ref("raw_events")}
WHERE event_timestamp > event_timestamp_checkpoint

在前面的代码示例中,event_timestamp_checkpoint 变量为 在 pre_operations 代码块中定义。 然后,将 event_timestamp_checkpoint 变量用作常量表达式 。WHERE

使用完全刷新从头开始重新构建增量表

您可以使用 包含 --full-refresh 选项的命令行界面 或使用完全刷新运行选项 触发工作流执行

选择完全刷新选项后,在开发工作区中或通过 使用 Dataform CLI 时,Dataform 会忽略 ${when(incremental(), ... } 参数,并重新创建 包含 CREATE OR REPLACE 语句的表。

防止增量表遭到完全刷新

为了防止从头开始重新构建增量表, 数据丢失,您可以将增量表设置为 protected。你可能需要 如果数据源为临时数据源,则防止重新构建增量表。

如需将增量表标记为 protected,请按以下步骤操作:

  1. 转到开发工作区。
  2. Files 窗格中,展开 definitions/
  3. 选择一个增量表定义 SQLX 文件。
  4. config 代码块中,输入 protected: true
  5. 可选:点击格式

以下代码示例展示了标记为 protected 的增量表:

config {
  type: "incremental",
  protected: true
}
SELECT ...

后续步骤