关于增量表
Dataform 会根据表类型以不同方式更新表。每次执行表或视图时,Dataform 都会重新构建整个表或视图。
定义增量表时,Dataform 首次从头开始构建增量表。在后续执行期间,Dataform 只会根据您配置的条件在增量表中插入或合并新行。
Dataform 只会将新行插入增量表中已存在的列。如果您对增量表定义查询进行更改(例如,添加新列),则必须从头开始重新构建表。为此,在下次触发表的执行时,请选择运行时会完全刷新选项。
以下是增量表的一些常见用例:
效果优化
对于某些数据(例如网络日志或分析数据),您可能希望仅处理新记录,而不是重新处理整个表。
缩短延迟时间
您可以使用增量表快速而频繁地执行工作流,从而减少输出表的下游延迟时间。
每日快照
您可以对增量表进行配置,以创建表数据的每日快照,例如,对存储在生产数据库中的用户设置进行纵向分析。
须知事项
所需的角色
如需获取配置增量表所需的权限,请让管理员向您授予对工作区的 Dataform Editor (roles/dataform.editor
) IAM 角色。如需详细了解如何授予角色,请参阅管理访问权限。
处理增量表中的行子集
如需确定 Dataform 在每次执行期间要处理的行子集,请向增量表 SQLX 定义文件添加条件 WHERE
子句。
如需配置增量表,请按以下步骤操作:
- 转到您的开发工作区。
- 在 Files 窗格中,展开
definitions/
。 - 打开增量表定义 SQLX 文件。
- 请按照以下格式输入
WHERE
子句:
config { type: "incremental" }
SELECT_STATEMENT
${when(incremental(), `WHERE CONDITION FROM ${self()}`) }
- 将 SELECT_STATEMENT 替换为定义表的
SELECT
语句。 - 将 CONDITION 替换为
WHERE
子句的条件,该条件会选择 Dataform 在表执行期间处理的行。
以下代码示例展示了一个增量表,用于增量处理 productiondb.logs
表中的行:
config { type: "incremental" }
SELECT timestamp, message FROM ${ref("productiondb", "logs")}
${when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }
以下代码示例展示了用于创建 productiondb.customers
表快照的增量表:
config { type: "incremental" }
SELECT CURRENT_DATE() AS snapshot_date, customer_id, name, account_settings FROM ${ref("productiondb", "customers")}
${when(incremental(), `WHERE snapshot_date > (SELECT MAX(snapshot_date) FROM ${self()})`) }
合并增量表中的行
如需确保增量表仅包含与所选的列组合对应的一行,请将所选列设置为 uniqueKey
,以合并具有相同 uniqueKey
的行。更新表时,Dataform 会将行与 uniqueKey
合并,而不是附加行。
如需在增量表格中配置合并,请按以下步骤操作:
- 转到您的开发工作区。
- 在 Files 窗格中,展开
definitions/
。 - 选择增量表定义 SQLX 文件
- 在
config
代码块中,将所选列设置为uniqueKey
,格式如下:
uniqueKey: ["COLUMN_NAME"]
将 COLUMN_NAME 替换为所选列名称。
以下代码示例展示了一个增量表,其中 transaction_id
列设置为 uniqueKey
,以确保它始终包含一行:
config {
type: "incremental",
uniqueKey: ["transaction_id"]
}
SELECT timestamp, action FROM weblogs.user_actions
${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }
过滤增量表中的行
在增量分区表中,为了避免 Dataform 扫描整个表以查找匹配的行,请将 updatePartitionFilter
设置为仅考虑一部分记录。
以下代码示例展示了通过设置 uniqueKey
和 updatePartitionFilter
属性来配置合并的增量分区表:
config {
type: "incremental",
uniqueKey: ["transaction_id"],
bigquery: {
partitionBy: "DATE(timestamp)",
updatePartitionFilter:
"timestamp >= timestamp_sub(current_timestamp(), interval 24 hour)"
}
}
SELECT timestamp, action FROM weblogs.user_actions
${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }
从分区表中提取数据时,请避免全表扫描
在创建引用分区表的增量表时,我们建议您构建表查询,以避免每次增量更新时对分区表进行全表扫描。
您可以在表查询中使用常量表达式,限制 BigQuery 扫描以更新增量表的分区数量。如需将分区表中的值转换为常量表达式,请使用 BigQuery 脚本在 pre_operations
代码块中将该值声明为变量。然后,将该变量用作 SELECT
查询的 WHERE
子句中的常量表达式。
借助此配置,Dataform 会根据所引用分区表的最新分区来更新增量表,而无需扫描整个表。
如需配置引用分区表并避免全表扫描的增量表,请按以下步骤操作:
- 转到您的开发工作区。
- 在 Files 窗格中,展开
definitions/
。 - 选择增量表定义 SQLX 文件
- 在
pre_operations
代码块中,使用 BigQuery 脚本声明变量。 - 使用定义所声明变量的
WHERE
子句过滤定义表的SELECT
语句。
以下代码示例展示了增量表,其中引用的 raw_events
表按 event_timestamp
分区:
config {
type: "incremental",
}
pre_operations {
DECLARE event_timestamp_checkpoint DEFAULT (
${when(incremental(),
`SELECT max(event_timestamp) FROM ${self()}`,
`SELECT timestamp("2000-01-01")`)}
)
}
SELECT
*
FROM
${ref("raw_events")}
WHERE event_timestamp > event_timestamp_checkpoint
在上述代码示例中,event_timestamp_checkpoint
变量在 pre_operations
代码块中定义。然后,event_timestamp_checkpoint
变量会用作 WHERE
子句中的常量表达式。
使用完全刷新,从头开始重新构建增量表
您可以在触发工作流执行时,使用带有 --full-refresh
选项的命令行界面或完全刷新时运行选项强制从头开始重新构建增量表。
当您选择完整刷新选项时,在开发工作区中或使用 Dataform CLI 时,Dataform 在执行期间会忽略 ${when(incremental(), ... }
参数,并使用 CREATE OR REPLACE
语句重新创建表。
保护增量表不完全刷新
为防止增量表从头开始重新构建并避免数据丢失,您可以将增量表设置为 protected
。如果您的数据源是临时的,您可能需要阻止重新构建增量表。
如需将增量表标记为 protected
,请按以下步骤操作:
- 转到您的开发工作区。
- 在 Files 窗格中,展开
definitions/
。 - 选择增量表定义 SQLX 文件。
- 在
config
代码块中,输入protected: true
。
以下代码示例显示了标记为 protected
的增量表:
config {
type: "incremental",
protected: true
}
SELECT ...
后续步骤
- 如需了解如何定义表,请参阅创建表。
- 如需了解如何使用 Dataform 命令行界面,请参阅使用 Dataform CLI。
- 如需了解如何手动触发执行,请参阅触发执行。