Dataform 核心示例脚本

本文档展示了可用于在 Dataform 中创建 SQL 工作流的 Dataform 核心和 JavaScript 脚本示例。

创建表

创建包含 Dataform Core 的视图

以下代码示例显示了 definitions/new_view.sqlx 文件中名为 new_view 的视图的定义:

config { type: "view" }

SELECT * FROM source_data

使用 Dataform 核心创建具体化视图

以下代码示例显示了 definitions/new_materialized_view.sqlx 文件中名为 new_materialized_view 的具体化视图的定义:

config {
  type: "view",
  materialized: true
}

SELECT * FROM source_data

创建包含 Dataform 核心的表

以下代码示例展示了 definitions/new_table.sqlx 文件中名为 new_table 的表的定义:

config { type: "table" }

SELECT * FROM source_data

使用 Dataform 核心创建增量表

以下代码示例展示了一个增量表,该表以增量方式处理 productiondb.logs 表中的行:

config { type: "incremental" }

SELECT timestamp, message FROM ${ref("productiondb", "logs")}

${when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

使用 ref 函数通过 Dataform 核心引用表

以下代码示例展示了用于在 definitions/new_table_with_ref.sqlx 表定义文件中引用 source_data 表的 ref 函数:

config { type: "table" }

SELECT * FROM ${ref("source_data")}

使用 Dataform Core 向表、视图或声明添加文档

以下代码示例显示了 definitions/documented_table.sqlx 表定义文件中的表和列说明:

config { type: "table",
         description: "This table is an example",
         columns:{
             user_name: "Name of the user",
             user_id: "ID of the user"
      }
  }

SELECT user_name, user_id FROM ${ref("source_data")}

配置增量表

使用 Dataform 核心为源数据中的新日期添加新的表格行

以下代码示例展示了 definitions/incremental_table.sqlx 文件中增量表的配置。在此配置中,Dataform 会在每个新日期的 incremental_table 中附加一个新行:

config { type: "incremental" }

SELECT date(timestamp) AS date, action
FROM weblogs.user_actions

${ when(incremental(), `WHERE timestamp > (select max(date) FROM ${self()})`)

使用 Dataform 核心定期截取表的快照

以下代码示例展示了 definitions/snapshots_table.sqlx 文件中增量表的配置。在此配置中,Dataform 会使用指定日期的 productiondb.customers 快照创建 snapshots_table

config { type: "incremental" }

SELECT current_date() AS snapshot_date, customer_id, name, account_settings FROM productiondb.customers

${ when(incremental(), `WHERE snapshot_date > (SELECT max(snapshot_date) FROM ${self()})`) }

构建一个使用 Dataform 核心以增量更新的 30 天滚动表

以下代码示例展示了 definitions/incremental_example.sqlx 文件中增量表的配置。在此配置中,Dataform 会创建一个临时 incremental_example 来增量更新,并在表创建 30 天后将其删除:

config {type: "incremental"}

post_operations {
  delete FROM ${self()} WHERE date < (date_add(Day, -30, CURRENT_DATE))
}

SELECT
 date(timestamp) AS date,
 order_id,
FROM source_table
  ${ when(incremental(), `WHERE timestamp > (SELECT max(date) FROM ${self()})`) }

创建自定义 SQL 操作

使用 Dataform Core 在 SQLX 文件中运行多项 SQL 操作

以下代码示例展示了用于分隔 definitions/operations.sqlx 中定义的多个 SQL 操作的 ;

config { type: "operations" }

DELETE FROM datatable where country = 'GB';
DELETE FROM datatable where country = 'FR';

在使用 Dataform 核心创建表之前运行自定义 SQL

以下代码示例展示了在 definitions/table_with_preops.sqlx 表定义文件的 pre_operations 块中定义的自定义 SQL 操作:

config {type: "table"}

SELECT * FROM ...

pre_operations {
  INSERT INTO table ...
}

使用 Dataform 核心创建表后运行自定义 SQL

以下代码示例展示了在 definitions/table_with_postops.sqlx 表定义文件的 post_operations 块中定义的自定义 SQL 操作:

config {type: "table"}

SELECT * FROM ...

post_operations {
  GRANT `roles/bigquery.dataViewer`
  ON
  TABLE ${self()}
  TO "group:allusers@example.com", "user:otheruser@example.com"
}

验证表

使用 Dataform 核心向表、视图或声明添加断言

以下代码示例展示了添加到 definitions/tested_table.sqlx 表定义文件的 uniqueKeynonNullrowConditions 断言:

config {
  type: "table",
  assertions: {
    uniqueKey: ["user_id"],
    nonNull: ["user_id", "customer_id"],
    rowConditions: [
      'signup_date is null or signup_date > "2022-01-01"',
      'email like "%@%.%"'
    ]
  }
}
SELECT ...

使用 Dataform 核心添加自定义断言

以下代码示例显示了表定义文件中的一个自定义断言,用于验证 source_data 中的 abc 列是否为 null

config { type: "assertion" }

SELECT
  *
FROM
  ${ref("source_data")}
WHERE
  a is null
  or b is null
  or c is null

使用 JavaScript 进行开发

在 JavaScript 中使用内联变量和函数

以下代码示例展示了在 js 块中定义并在 SQLX 文件中以内嵌方式使用的 foo 变量:

js {
 const foo = 1;
 function bar(number){
     return number+1;
 }
}

SELECT
 ${foo} AS one,
 ${bar(foo)} AS two

使用 JavaScript 为每个国家/地区生成一个表格

以下代码示例展示了如何使用 forEach 函数为每个在 definitions/one_table_per_country.js 文件的 countries 中定义的每个国家/地区生成一个表格:

const countries = ["GB", "US", "FR", "TH", "NG"];

countries.forEach(country => {
  publish("reporting_" + country)
    .dependencies(["source_table"])
    .query(
      ctx => `
      SELECT '${country}' AS country
      `
    );
});

使用 JavaScript 在一个文件中声明多个来源

以下代码示例展示了如何在 definitions/external_dependencies.js 文件中声明多个数据源:

declare({
  schema: "stripe",
  name: "charges"
});

declare({
  schema: "shopify",
  name: "orders"
});

declare({
  schema: "salesforce",
  name: "accounts"
});

使用 forEach 在一个文件中声明多个来源

以下代码示例显示了在 definitions/external_dependencies.js 文件中使用 forEach 函数声明多个数据源:

["charges", "subscriptions", "line_items", "invoices"]
  .forEach(source => declare({
      schema: "stripe",
      name: source
    })
  );

使用 JavaScript 删除所有包含个人身份信息的表中的敏感信息

以下代码示例展示了一个 definitions/delete_pii.js 文件中的函数,该函数用于删除所有包含个人身份信息 (PII) 的表中的选定信息:

const pii_tables = ["users", "customers", "leads"];
pii_tables.forEach(table =>
  operate(`gdpr_cleanup: ${table}`,
    ctx => `
      DELETE FROM raw_data.${table}
      WHERE user_id in (SELECT * FROM users_who_requested_deletion)`)
      .tags(["gdpr_deletion"]))
);

使用 JavaScript 添加 preOpspostOps

以下代码示例展示了用于在 definitions/pre_and_post_ops_example.js 表中创建包含 preOpspostOps 的查询的 publish 函数:

publish("example")
  .preOps(ctx => `GRANT \`roles/bigquery.dataViewer\` ON TABLE ${ctx.ref("other_table")} TO "group:automation@example.com"`)
  .query(ctx => `SELECT * FROM ${ctx.ref("other_table")}`)
  .postOps(ctx => `REVOKE \`roles/bigquery.dataViewer\` ON TABLE ${ctx.ref("other_table")} TO "group:automation@example.com"`)

使用 JavaScript 创建增量表

以下代码示例展示了用于在 definitions/incremental_example.js 文件中创建增量表的 publish 函数:

publish("incremental_example", {
  type: "incremental"
}).query(ctx => `
  SELECT * FROM ${ctx.ref("other_table")}
  ${ctx.when(ctx.incremental(),`WHERE timestamp > (SELECT MAX(date) FROM ${ctx.self()}`)}
`)

使用 JavaScript 回填每日表

以下代码示例展示了如何在 definitions/backfill_daily_data.js 文件中回填每日更新的表:

var getDateArray = function(start, end) {
  var startDate = new Date(start); //YYYY-MM-DD
  var endDate = new Date(end); //YYYY-MM-DD

  var arr = new Array();
  var dt = new Date(startDate);
  while (dt <= endDate) {
    arr.push(new Date(dt).toISOString().split("T")[0]);
    dt.setDate(dt.getDate() + 1);
  }
  return arr;
};

var dateArr = getDateArray("2020-03-01", "2020-04-01");

// step 1: create table
operate(`create table`, 'create table if not exists backfill_table (`fields`) `);
// step 2: insert into the table

dateArr.forEach((day, i) =>
  operate(`backfill ${day}`
   `insert into backfill_table select fields where day = '${day}'`)
);

通过 include 重复使用代码

在 JavaScript 中使用全局变量

以下代码示例展示了 includes/constants.jsproject_idfirst_date 常量的定义:

const project_id = "project_id";
const first_date = "'1970-01-01'";
module.exports = {
  project_id,
  first_date
};

以下代码示例展示了 definitions/new_table.sqlx 文件中引用的 first_date 常量:

config {type: "table"}

SELECT * FROM source_table WHERE date > ${constants.first_date}

使用 JavaScript 创建国家/地区映射

以下代码示例展示了在 includes/mapping.js 文件中定义的 country_group 自定义函数:

function country_group(country){
  return `
  case
    when ${country} in ('US', 'CA') then 'NA'
    when ${country} in ('GB', 'FR', 'DE', 'IT', 'PL', 'SE') then 'EU'
    when ${country} in ('AU') then ${country}
    else 'Other'
  end`;

以下代码示例展示了在 definitions/new_table.sqlx 表定义文件中使用 country_group 函数的表定义:

config { type: "table"}

SELECT
  country AS country,
  ${mapping.country_group("country")} AS country_group,
  device_type AS device_type,
  sum(revenue) AS revenue,
  sum(pageviews) AS pageviews,
  sum(sessions) AS sessions

FROM ${ref("source_table")}

GROUP BY 1, 2, 3

以下代码示例显示了在编译为 SQL 的 definitions/new_table.sqlx 中定义的查询:

SELECT
  country AS country,
  case
    when country in ('US', 'CA') then 'NA'
    when country in ('GB', 'FR', 'DE', 'IT', 'PL', 'SE') then 'EU'
    when country in ('AU') then country
    else 'Other'
  end AS country_group,
  device_type AS device_type,
  sum(revenue) AS revenue,
  sum(pageviews) AS pageviews,
  sum(sessions) AS sessions

FROM "dataform"."source_table"

GROUP BY 1, 2, 3

使用自定义 JavaScript 函数生成 SQL 脚本

以下代码示例展示了 includes/script_builder.js 中定义的 render_script 自定义函数:

function render_script(table, dimensions, metrics) {
  return `
      SELECT
      ${dimensions.map(field => `${field} AS ${field}`).join(",")},
      ${metrics.map(field => `sum(${field}) AS ${field}`).join(",\n")}
      FROM ${table}
      GROUP BY ${dimensions.map((field, i) => `${i + 1}`).join(", ")}
    `;
}

module.exports = { render_script };

以下代码示例展示了在 definitions/new_table.sqlx 表定义文件中使用 render_script 函数的表定义:

config {
    type: "table",
    tags: ["advanced", "hourly"],
    disabled: true
}

${script_builder.render_script(ref("source_table"),
                               ["country", "device_type"],
                               ["revenue", "pageviews", "sessions"]
                               )}

以下代码示例显示了在编译为 SQL 的 definitions/new_table.sqlx 中定义的查询:

SELECT
  country AS country,
  device_type AS device_type,
  sum(revenue) AS revenue,
  sum(pageviews) AS pageviews,
  sum(sessions) AS sessions

FROM "dataform"."source_table"

GROUP BY 1, 2

操作配置

加载包含操作配置的 SQL 文件

操作配置有助于加载纯 SQL 文件。您可以在 definitions 文件夹的 actions.yaml 文件中定义操作配置。

如需详细了解可用的操作类型和有效的操作配置选项,请参阅 Dataform 配置参考文档

以下代码示例显示了 definitions/actions.yaml 文件中名为 new_view 的视图的定义:

actions:
  - view:
    filename: new_view.sql

上述代码示例引用的 definitions/new_view.sql SQL 文件包含纯 SQL:

SELECT * FROM source_data