在 Dataform 中使用缓慢变化的维度

本文档介绍了如何使用 Dataform 中的开源“缓慢变化维度”软件包作为使用开源软件包的示例。

缓慢变化维度软件包中包含常用的数据模型,用于根据 Dataform 中的可变数据源创建第 2 类缓慢变化的维度表。

变化缓慢的维度表属于增量表,其中包含的数据无法预测,且并非定期更改(例如客户或产品)。在类型 2 缓慢变化的维度表中,新数据会附加到新行中,而不会覆盖现有的表行。表历史记录会保留在正在缓慢变化的维度键中某个给定键的多个记录中。每条记录都有一个唯一的键。

“缓慢变化的维度”软件包会在 BigQuery 中为给定的 NAME 创建以下关系:

  • NAME - 包含 scd_valid_fromscd_valid_to 字段的视图
  • NAME_updates - 用于存储源表更改历史记录的增量表

每当 Dataform 执行缓慢变化的维度增量表时,它都会更新缓慢变化的维度。您可能希望将缓慢变化的维度表安排为每天或每小时运行一次,具体取决于您要捕获的更改的粒度。

如需了解如何安排 Dataform 执行,请参阅使用 Cloud Composer 安排执行使用 Workflows 和 Cloud Scheduler 安排执行

准备工作

  1. “缓慢变化的维度”版本页面上,复制最新版本的 .tar.gz 网址。
  2. 创建 Dataform 代码库
  3. 在代码库中创建并初始化工作区
  4. 在 Dataform 代码库中安装“缓慢变化的维度”软件包

所需的角色

如需获取配置软件包所需的权限,请让管理员向您授予工作区的 Dataform Editor (roles/dataform.editor) IAM 角色。如需详细了解如何授予角色,请参阅管理访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

使用“缓慢变化维度”软件包创建缓慢变化的维度表

如需使用 Dataform 中的缓慢变化维度软件包软件包创建缓慢变化的维度表,请按以下步骤操作:

  1. Files 窗格的 definitions/ 旁边,点击 More 菜单。

  2. 点击创建文件

  3. 创建新文件窗格中,执行以下操作:

    1. 添加文件路径字段中的 definitions/ 后,输入文件名,后跟 .js。例如 definitions/definitions.js

    2. 点击创建文件

  4. Files 窗格中,选择新创建的 .js。它们。

  5. 按以下格式将软件包导入该文件:

     const CONSTANT-NAME = require("dataform-scd");
    

    CONSTANT-NAME 替换为常量的名称,例如 scd

  6. 您可以采用以下格式创建缓慢变化的维度表格:

    scd("source_data_scd", {
      uniqueKey: "UNIQUE_ID",
      timestamp: "UPDATED_AT", // A field that stores a timestamp or date of when the row was last changed.
      source: {
        schema: "SOURCE_SCHEMA",     // The source table to build slowly changing dimensions from.
        name: "SOURCE_SCHEMA_NAME",
      },
      incrementalConfig: {        // Any configuration parameters to apply to the incremental table that will be created.
        bigquery: {
          partitionBy: "UPDATED_AT",
        },
      },
    });
    

    替换以下内容:

    • UNIQUE_ID:表中行的唯一标识符
    • UPDATED_AT:用于存储上次更改行的时间戳或日期的字段的名称,例如 updated_at
    • SOURCE_SCHEMA:源表的架构,例如 dataform_scd_example
    • SOURCE_SCHEMA_NAME:源表的名称,例如 source_data
  7. 可选:点击格式

以下代码示例展示了使用“缓慢变化的维度”软件包创建的一个缓慢变化的维度表定义:

const scd = require("dataform-scd");

/**
 * Create an SCD table on top of the table defined in source_data.sqlx.
 */
const { updates, view } = scd("source_data_scd", {
  // A unique identifier for rows in the table.
  uniqueKey: "user_id",
  // A field that stores a timestamp or date of when the row was last changed.
  timestamp: "updated_at",
  // The source table to build slowly changing dimensions from.
  source: {
    schema: "dataform_scd_example",
    name: "source_data",
  },
  // Any tags that will be added to actions.
  tags: ["slowly-changing-dimensions"],
  // Documentation of table columns
  columns: {user_id: "User ID", some_field: "Data Field", updated_at: "Timestamp for updates"},
  // Configuration parameters to apply to the incremental table that will be created.
  incrementalConfig: {
    bigquery: {
      partitionBy: "updated_at",
    },
  },
});

// Additional customization of the created models can be done by using the returned actions objects.
updates.config({
  description: "Updates table for SCD",
});

后续步骤