创建表

在 Dataform 中,表是构成工作流的对象类型之一。您可以创建表,以引用为工作流声明的数据源中的数据或工作流中的其他表中的数据。Dataform 会将表定义实时编译为 SQL。当您触发执行时,Dataform 会执行 SQL 代码,并在 BigQuery 中创建您定义的表。

您可以在 type: "table" SQLX 文件中创建以下表类型:

  • table:常规表格。
  • incremental:增量表。
  • view:表格视图。如需了解详情,请参阅视图简介

您还可以定义表分区和集群

如需记录表的用途或其与工作流中其他表的关系,您可以为表或其所选列添加文档

如需针对特定条件测试表中的数据,您可以创建数据质量测试查询(称为断言)。每次更新工作流时,Dataform 都会运行断言,如果任何断言都失败,系统会向您发出提醒。

如需替换默认表设置(例如 databaseschema)并停用表创建,或者在表创建之前或之后执行 SQL 语句,您可以配置其他表设置

您可以配置其他表格设置,以执行以下操作:

  • 替换默认表格设置,例如 databaseschema
  • 停用表创建功能。
  • 在创建表之前或之后执行 SQL 语句。

如需在 BigQuery 中执行表后对其进行整理,您可以添加 BigQuery 标签。如需了解详情,请参阅标签简介

如需在表列级限制数据访问权限,您可以添加 BigQuery 政策标记。如需了解详情,请参阅列级访问权限控制简介

除了在 type: "table" SQLX 文件中定义表之外,您还可以通过在 type: "operations" SQLX 文件中定义自定义 SQL 查询来创建空表。您可能需要创建一个空表,以便其他服务可以向其中填充数据。

准备工作

  1. 在 Google Cloud 控制台中,进入 Dataform 页面。

    前往 Dataform

  2. 在代码库中创建并初始化开发工作区

  3. 可选:声明数据源

所需的角色

如需获得完成本文档中任务所需的权限,请让您的管理员为您授予工作区的 Dataform Editor (roles/dataform.editor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

创建表

本部分介绍了如何在 Dataform 中使用 Dataform 核心创建表。

表定义简介

如需定义表,您需要定义表类型并在 type: "table" SQLX 文件中编写 SELECT 语句。然后,Dataform 会将您的 Dataform 核心代码编译为 SQL,执行 SQL 代码,并在 BigQuery 中创建您定义的表。

在 Dataform 核心 SELECT 语句中,您可以定义表结构并引用工作流的其他对象。

除了在 type: "table" SLQX 文件中定义表之外,您还可以通过在 type: "operations" SQLX 文件中定义自定义 SQL 查询来创建空表。如需了解详情,请参阅创建空表

使用 ref 引用依赖项

如需在 SELECT 语句中引用工作流对象并自动将其添加为依赖项,请使用 ref 函数。Dataform 会先执行依赖项,然后再执行依赖于这些依赖项的表,以确保流水线顺序正确无误。

ref 函数是 Dataform 内置的核心函数,对 Dataform 中的依赖项管理至关重要。借助 ref 函数,您可以引用并自动依赖于 Dataform 工作流中定义的以下对象,而不是对架构和表名称进行硬编码:

Dataform 使用 ref 函数构建要创建或更新的所有表的依赖项树。

编译后,Dataform 会向 SQL 语句添加样板语句,例如 CREATEREPLACEINSERTMERGE

以下代码示例展示了使用 ref 函数的表格定义:

config { type: "table" }

SELECT
  order_date AS date,
  order_id AS order_id,
  order_status AS order_status,
  SUM(item_count) AS item_count,
  SUM(amount) AS revenue

FROM ${ref("store_clean")}

GROUP BY 1, 2

ref 函数中,您需要提供要依赖的表或数据源声明的名称。这通常是定义该表或数据源声明的 SQLX 文件的文件名。

如果表名称被替换,请在 ref 函数中使用被替换的名称。例如,将 config { name: "overridden_name" } 引用为 ref("overridden_name")。如需详细了解如何替换表名称,请参阅配置其他表设置

如果您在不同架构中拥有多个同名表,则可以通过向 ref 函数提供两个实参(架构名称和表名称)来引用特定表。

以下代码示例展示了包含两个实参的 ref 函数,用于指定特定架构中的表:

config { type: "table" }
SELECT * FROM ${ref("schema", "store_clean")}

对于 SELECT 语句中的 ref 函数中未引用的表、断言、数据源声明或自定义 SQL 操作,您还可以手动向 config 块添加表依赖项。Dataform 会在依赖表之前执行这些依赖项。

以下代码示例展示了 config 块中的表依赖项:

config { dependencies: [ "unreferenced_table" ] }
SELECT * FROM ...

如需详细了解工作流中的依赖项管理,请参阅声明依赖项

使用 resolve 引用其他表

借助 resolve 函数,您可以在 SELECT 语句中引用表或数据源声明(就像使用 ref 函数一样),但它不会将引用添加为依赖项。这意味着,使用 resolve 函数引用的对象不会影响使用 resolve 函数的表的执行。

如需详细了解内置 Dataform 核心函数,请参阅 Dataform 核心参考文档

为表定义创建 SQLX 文件

将表定义 SQLX 文件存储在 definitions/ 目录中。如需在 definitions/ 目录中创建新的 SQLX 文件,请按以下步骤操作:

  1. 在 Google Cloud 控制台中,进入 Dataform 页面。

    前往 Dataform

  2. 如需打开代码库,请点击代码库名称。

  3. 如需打开开发工作区,请点击相应工作区名称。

  4. 文件窗格中,点击 definitions/ 旁边的更多

  5. 点击创建文件

  6. Add a file path 字段中,在 definitions/ 后面输入文件名称,后跟 .sqlx。例如 definitions/my-table.sqlx

    文件名只能包含数字、字母、连字符和下划线。

  7. 点击创建文件

定义表类型

如需创建新的表类型定义,请按以下步骤操作:

  1. 在开发工作区中的 Files 窗格中,展开 definitions/ 目录。
  2. 选择要修改的表定义 SQLX 文件。
  3. 在文件中输入以下代码段:

    config { type: "TABLE_TYPE" }
    

    TABLE_TYPE 替换为以下表格类型之一:

    • table
    • incremental
    • view
  4. 可选:如需定义具体化视图,请按以下格式在 type: "view" 下输入 materialized 属性:

    config {
      type: "view",
      materialized: true
    }
    

    如需了解详情,请参阅 ITableConfig

  5. 可选:点击格式

定义表结构和依赖项

如需编写表定义 SELECT 语句并定义表结构和依赖项,请按以下步骤操作:

  1. 在开发工作区中的 Files 窗格中,展开 definitions/ 目录。
  2. 选择要修改的表定义 SQLX 文件。
  3. config 代码块下方,编写 SELECT 语句。
  4. 可选:点击格式

以下代码示例展示了一个包含 SELECT 语句和 ref 函数的表定义:

config { type: "table" }
SELECT
  customers.id AS id,
  customers.first_name AS first_name,
  customers.last_name AS last_name,
  customers.email AS email,
  customers.country AS country,
  COUNT(orders.id) AS order_count,
  SUM(orders.amount) AS total_spent
FROM
  dataform-samples.dataform_sample.crm_customers AS customers
  LEFT JOIN ${ref('order_stats')} orders
    ON customers.id = orders.customer_id

WHERE
  customers.id IS NOT NULL
  AND customers.first_name <> 'Internal account'
  AND country IN ('UK', 'US', 'FR', 'ES', 'NG', 'JP')

GROUP BY 1, 2, 3, 4, 5

添加手动表依赖项

如需添加 SELECT 语句中未引用但需要在当前表之前执行的表依赖项,请按以下步骤操作:

  1. 在开发工作区中的 Files 窗格中,展开 definitions/ 目录。
  2. 选择要修改的表定义 SQLX 文件。
  3. 在表格的 config 代码块中,输入以下代码段:

    dependencies: [ "DEPENDENCY_TABLE", ]
    

    DEPENDENCY_TABLE 替换为您要添加为依赖项的表的文件名。您可以输入多个文件名。

  4. 可选:点击格式

以下代码示例展示了将两个表添加为表定义文件的 config 块的手动表依赖项:

config { dependencies: [ "some_table", "some_other_table" ] }

创建表分区和集群

本部分介绍了如何使用 Dataform 核心创建表分区和集群。BigQuery 支持分区表和表聚簇。如需了解详情,请参阅分区表简介创建和使用聚簇表

创建表分区

如需创建表分区,请按以下步骤操作:

  1. 前往您的开发工作区。
  2. 文件窗格中,展开 definitions/
  3. 打开表定义 SQLX 文件。
  4. config 代码块中,在表格类型声明下方添加 bigquery 代码块,格式如下:

    config {
      type: "table",
      bigquery: {
      }
    }
    
  5. bigquery 代码块中,输入以下代码段:

        partitionBy: "PARTITION_EXPRESSION"
    

    PARTITION_EXPRESSION 替换为用于对表进行分区的表达式

  6. 可选:点击格式

以下代码示例展示了如何在表定义 SQLX 文件中按小时对表进行分区:

config {
  type: "table",
  bigquery: {
    partitionBy: "DATETIME_TRUNC(<timestamp_column>, HOUR)"
  }
}

以下代码示例展示了如何在表定义 SQLX 文件中按整数值对表进行分区:

config {
  type: "table",
  bigquery: {
    partitionBy: "RANGE_BUCKET(<integer_column>, GENERATE_ARRAY(0, 1000000, 1000))"
  }
}

设置分区过滤条件

如需设置分区过滤条件,请按以下步骤操作:

  1. 前往您的开发工作区。
  2. 文件窗格中,展开 definitions/
  3. 打开分区表定义 SQLX 文件。
  4. bigquery 代码块中,输入以下代码段:

    requirePartitionFilter : true
    
  5. 可选:点击格式

以下代码示例展示了在分区表 SQLX 文件的 bigquery 块中设置的分区过滤器:

config {
  type: "table",
  bigquery: {
    partitionBy: "DATE(ts)",
    requirePartitionFilter : true
  }
}
SELECT CURRENT_TIMESTAMP() AS ts

如需详细了解 BigQuery 中的分区过滤条件,请参阅对分区表设置 require partition filter 属性

为分区设置保留期限

如需控制分区表中所有分区的保留期限,请按以下步骤操作:

  1. 前往您的开发工作区。
  2. 文件窗格中,展开 definitions/
  3. 打开分区表定义 SQLX 文件。
  4. bigquery 代码块中,输入以下代码段:

    partitionExpirationDays: NUMBER_OF_DAYS
    

    NUMBER_OF_DAYS 替换为您要保留分区的天数。

  5. 可选:点击格式

以下代码示例显示了分区表 SQLX 文件的 bigquery 块中分区的保留期限设置为 14 天:

config {
  type: "table",
  bigquery: {
    partitionBy: "DATE(ts)",
    partitionExpirationDays: 14,
  }
}
SELECT CURRENT_TIMESTAMP() AS ts

创建表集群

如需创建表集群,请按以下步骤操作:

  1. 前往您的开发工作区。
  2. 文件窗格中,展开 definitions/
  3. 打开表定义 SQLX 文件。
  4. bigquery 代码块中,输入以下代码段:

        clusterBy: ["CLUSTER_COLUMN"]
    

    CLUSTER_COLUMN 替换为您要按其对表格进行分簇的列的名称。如需了解详情,请参阅 clustering_column_list

  5. 可选:点击格式

以下代码示例展示了一个按 namerevenue 列聚簇的分区表:

config {
  type: "table",
  bigquery: {
    partitionBy: "DATE(ts)",
    clusterBy: ["name", "revenue"]
  }
}
SELECT CURRENT_TIMESTAMP() as ts, name, revenue

配置增量表

本部分介绍如何使用 Dataform 核心配置增量表。

增量表简介

Dataform 会根据表类型以不同的方式更新表。在每次执行表或视图时,Dataform 都会从头开始重新构建整个表或视图。

定义增量表时,Dataform 仅在首次构建增量表时从头开始构建。在后续执行期间,Dataform 仅根据您配置的条件将新行插入或合并到增量表中。

Dataform 仅将新行插入增量表中已存在的列。如果您更改了增量表定义查询(例如,添加了新列),则必须从头开始重新构建表。为此,请在下次触发表的执行时,选择运行并完全刷新选项。

以下是增量表的一些常见用例:

性能优化
对于某些类型的数据(例如网站日志或分析数据),您可能只想处理新记录,而不是重新处理整个表。
缩短延迟时间
您可以使用增量表快速但频繁地执行工作流,从而缩短输出表的下游延迟时间。
每日快照
您可以配置增量表来创建表数据的每日快照,例如,对存储在生产数据库中的用户设置进行纵向分析。

处理增量表中的部分行

如需确定 Dataform 在每次执行期间要处理的行子集,请向增量表 SQLX 定义文件添加条件 WHERE 子句。在 WHERE 子句中,您可以指定增量条件和非增量条件。在表执行期间,如果不进行完整刷新,Dataform 会应用增量条件;如果进行完整刷新,则会应用非增量条件。

如需配置增量表,请按以下步骤操作:

  1. 前往您的开发工作区。
  2. 文件窗格中,展开 definitions/
  3. 打开增量表定义 SQLX 文件。
  4. 按以下格式输入 WHERE 子句:

    config { type: "incremental" }
    
    SELECT_STATEMENT
    
    ${when(incremental(), `WHERE INCREMENTAL_CONDITION`, `WHERE NON_INCREMENTAL_CONDITION`) }
    

    替换以下内容:

    • SELECT_STATEMENT:用于定义表的 SELECT 语句。
    • INCREMENTAL_CONDITION:您在 WHERE 子句中指定的条件,用于在表格执行期间选择要由 Dataform 处理的行,而无需完全刷新。
    • NON_INCREMENTAL_CONDITION:您在 WHERE 子句中指定的条件,用于在表执行期间通过完全刷新选择要由 Dataform 处理的行。
  5. 可选:点击格式

以下代码示例展示了一个增量表,该表会增量处理 productiondb.logs 表的行:

config { type: "incremental" }

SELECT timestamp, message FROM ${ref("productiondb", "logs")}

${when(incremental(),
   `WHERE date > (SELECT MAX(date) FROM ${self()}) AND country = "UK"`,
   `WHERE country = "UK"`)}

以下代码示例展示了一个增量表,该表会创建 productiondb.customers 表的快照:

config { type: "incremental" }

SELECT CURRENT_DATE() AS snapshot_date, customer_id, name, account_settings FROM ${ref("productiondb", "customers")}

${when(incremental(), `WHERE snapshot_date > (SELECT MAX(snapshot_date) FROM ${self()})`) }

合并增量表中的行

为确保增量表仅包含与所选列组合对应的一行,请将所选列设置为 uniqueKey,以合并具有相同 uniqueKey 值的行。更新表时,Dataform 会合并具有相同 uniqueKey 值的行,而不是将其附加到表中。

如需在增量表中配置合并,请按以下步骤操作:

  1. 前往您的开发工作区。
  2. 文件窗格中,展开 definitions/
  3. 选择增量表定义 SQLX 文件
  4. config 代码块中,将所选列设置为 uniqueKey,格式如下:

    uniqueKey: ["COLUMN_NAME"]
    

    COLUMN_NAME 替换为所选列的名称。

  5. 可选:点击格式

以下代码示例展示了一个增量表,其中 transaction_id 列设置为 uniqueKey,以确保该列始终包含一行:

config {
  type: "incremental",
  uniqueKey: ["transaction_id"]
}

SELECT timestamp, action FROM weblogs.user_actions
${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

过滤增量表中的行

在增量分区表中,为避免 Dataform 扫描整个表格以查找匹配的行,请将 updatePartitionFilter 设置为仅考虑部分记录。

以下代码示例展示了一个增量分区表,其中通过设置 uniqueKeyupdatePartitionFilter 属性配置了合并:

config {
  type: "incremental",
  uniqueKey: ["transaction_id"],
  bigquery: {
    partitionBy: "DATE(timestamp)",
    updatePartitionFilter:
        "timestamp >= timestamp_sub(current_timestamp(), interval 24 hour)"
  }
}

SELECT timestamp, action FROM weblogs.user_actions
${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

从分区表中提取数据时,避免全表扫描

创建引用分区表的增量表时,我们建议您构建表查询,以避免在每次增量更新期间对分区表执行全表扫描。

您可以在表查询中使用常量表达式,限制 BigQuery 扫描的分区数量,以更新增量表。如需将分区表中的值转换为常量表达式,请使用 BigQuery 脚本在 pre_operations 块中将该值声明为变量。然后,在 SELECT 查询的 WHERE 子句中将该变量用作常量表达式。

采用此配置后,Dataform 会根据引用的分区表的最新分区更新增量表,而无需扫描整个表。

如需配置引用分区表并避免全表扫描的增量表,请按以下步骤操作:

  1. 前往您的开发工作区。
  2. 文件窗格中,展开 definitions/
  3. 选择增量表定义 SQLX 文件
  4. pre_operations 块中,使用 BigQuery 脚本声明变量
  5. 使用引用声明的变量的 WHERE 子句过滤用于定义表的 SELECT 语句。
  6. 可选:点击格式

以下代码示例展示了一个增量表,其中引用的 raw_events 表按 event_timestamp 分区:

config {
  type: "incremental",
}

pre_operations {
  DECLARE event_timestamp_checkpoint DEFAULT (
    ${when(incremental(),
    `SELECT max(event_timestamp) FROM ${self()}`,
    `SELECT timestamp("2000-01-01")`)}
  )
}

SELECT
  *
FROM
  ${ref("raw_events")}
WHERE event_timestamp > event_timestamp_checkpoint

在前面的代码示例中,event_timestamp_checkpoint 变量是在 pre_operations 块中定义的。然后,event_timestamp_checkpoint 变量会用作 WHERE 子句中的常量表达式。

通过完全刷新从头重建增量表

您可以使用命令行界面(并使用 --full-refresh 选项)触发工作流执行时使用的 Run with full refresh 选项,强制从头重新构建增量表。

在开发工作区中或通过 Dataform CLI 选择“完全刷新”选项时,Dataform 会在执行期间忽略 ${when(incremental(), ... } 参数,并使用 CREATE OR REPLACE 语句重新创建表。

保护增量表免遭全面刷新

为防止增量表从头重建并可能丢失数据,您可以将增量表设置为 protected。如果您的数据源是临时的,您可能需要阻止重新构建增量表。

如需将增量表标记为 protected,请按以下步骤操作:

  1. 前往您的开发工作区。
  2. 文件窗格中,展开 definitions/
  3. 选择增量表定义 SQLX 文件。
  4. config 代码块中,输入 protected: true
  5. 可选:点击格式

以下代码示例展示了一个标记为 protected 的增量表:

config {
  type: "incremental",
  protected: true
}
SELECT ...

添加表格文档

本部分介绍了如何向 Dataform 核心 SQLX 文件添加表及其列和记录的说明。

您可以向 Dataform 中的所有表类型(表、增量表和视图)添加表、列和记录说明。

您可能需要记录以下内容:

  • 表的用途。
  • 表中列或记录的内容或角色。
  • 表与工作流的其他操作之间的关系,例如依赖于当前表的表或视图。
  • 应用于表的断言。
  • 应用于表的前置操作或后置操作。
  • 表的所有者,即创建表的用户。如果有多个团队成员处理工作流,这些信息可能会很有用。

添加表说明

如需向 SQLX 文件中的表添加说明,请按以下步骤操作:

  1. 在 Google Cloud 控制台中,进入 Dataform 页面。

    前往 Dataform

  2. 选择一个代码库。

  3. 选择一个开发工作区。

  4. 文件窗格中,点击要修改的表定义 SQLX 文件。

  5. 在文件的 config 块中,使用以下格式输入表格说明:

    description: "Description of the table",
    
  6. 可选:点击格式

以下代码示例展示了添加到 SQLX 表定义文件的 config 块中的表说明:

config {
  type: "table",
  description: "Description of the table",
 }

添加列和记录说明

如需向 SQLX 文件添加各个列和记录的说明,请按以下步骤操作:

  1. 在表定义文件的 config 块中,输入 columns: {}
  2. columns: {} 中,按以下格式输入列说明:

    column_name: "Description of the column",
    
  3. columns: {} 中,按以下格式输入记录说明:

      record_name: {
          description: "Description of the record",
          columns: {
            record_column_name: "Description of the record column"
          }
    }
    
  4. 可选:点击格式

以下代码示例显示了 SQLX 表定义文件的 config 块中的表、列和记录说明:

config {
  type: "table",
  description: "Description of the table.",
  columns: {
    column1_name: "Description of the first column",
    column2_name: "Description of the second column",
    column3_name: "Description of the third column",
    record_name: {
      description: "Description of the record.",
      columns: {
       record_column1_name: "Description of the first record column",
       record_column2_name: "Description of the second record column",
      }
    }
  }
}
SELECT
  "first_column_value" AS column_1_name,
  "second_column_value" AS column_2_name,
  "third_column_value" AS column_3_name,
  STRUCT("first" AS record_column1_name,
    "second" AS record_column2_name) AS record_name

使用“包含”重复使用列文档

您可以使用 JavaScript 包含功能在 SQL 工作流中重复使用 Dataform 中列的说明。如果您的 SQL 工作流中有多个名称和说明相同的列,您可能需要重复使用列文档。

您可以定义包含单个列说明的常量,也可以定义包含集或列说明的常量,以重复使用表中所有列的说明。如需详细了解如何在 Dataform 中创建和使用包含文件,请参阅使用包含文件在单个代码库中重复使用代码

以下代码示例展示了多个常量,其中包含 includes/docs.js JavaScript 文件中定义的各个列的说明:


// filename is includes/docs.js

const user_id = `A unique identifier for a user`;
const age = `The age of a user`;
const creation_date = `The date this user signed up`;
const user_tenure = `The number of years since the user's creation date`;
const badge_count = `The all-time number of badges the user has received`;
const questions_and_answer_count = `The all-time number of questions and answers the user has created`;
const question_count = `The all-time number of questions the user has created`;
const answer_count = `The all-time number of answers the user has created`;
const last_badge_received_at = `The time the user received their most recent badge`;
const last_posted_at = `The time the user last posted a question or answer`;
const last_question_posted_at = `The time the user last posted an answer`;
const last_answer_posted_at = `The time the user last posted a question`;

module.exports = {
   user_id,
   age,
   creation_date,
   user_tenure,
   badge_count,
   questions_and_answer_count,
   question_count,
   answer_count,
   last_badge_received_at,
   last_posted_at,
   last_question_posted_at,
   last_answer_posted_at,
};

以下代码示例展示了在 definitions/my_table.sqlx SQLX 表定义文件中用于为表中的所选列生成文档的 includes/docs.js 中定义的 user_idage 常量:

config {
  type: "table",
  description: "Table description.",
  columns: {
    user_id: docs.user_id,
    column2_name: "Description of the second column",
    column3_name: "Description of the third column",
    age: docs.age,
  }
}

SELECT ...

以下代码示例显示了一个常量,其中包含 includes/docs.js JavaScript 文件中定义的一组列说明:


// filename is includes/docs.js

const columns = {
    user_id = `A unique identifier for a user`,
    age = `The age of a user`,
    creation_date = `The date this user signed up`,
    user_tenure = `The number of years since the user's creation date`,
    badge_count = `The all-time number of badges the user has received`,
    questions_and_answer_count = `The all-time number of questions and answers the user has created`,
    question_count = `The all-time number of questions the user has created`,
    answer_count = `The all-time number of answers the user has created`,
    last_badge_received_at = `The time the user received their most recent badge`,
    last_posted_at = `The time the user last posted a question or answer`,
    last_question_posted_at = `The time the user last posted an answer`,
    last_answer_posted_at = `The time the user last posted a question`,
}


module.exports = {
  columns
};

以下代码示例展示了 columns 常量,该常量在 includes/table_docs.js 中定义,并在 definitions/my_table.sqlx SQLX 表定义文件中用于为表中的所有列生成文档:

config { type: "table",
description: "My table description",
columns: docs.columns
}

SELECT 1 AS one

后续步骤