Dataform 核心示例脚本

本文档展示了一些 Dataform 核心和 JavaScript 脚本示例,您可以使用这些脚本在 Dataform 中创建 SQL 工作流。

创建表

使用 Dataform 核心创建视图

以下代码示例展示了 definitions/new_view.sqlx 文件中名为 new_view 的视图的定义:

config { type: "view" }

SELECT * FROM source_data

使用 Dataform 核心创建具体化视图

以下代码示例展示了 definitions/new_materialized_view.sqlx 文件中名为 new_materialized_view 的具体化视图的定义:

config {
  type: "view",
  materialized: true
}

SELECT * FROM source_data

使用 Dataform 核心创建表

以下代码示例展示了 definitions/new_table.sqlx 文件中名为 new_table 的表的定义:

config { type: "table" }

SELECT * FROM source_data

使用 Dataform 核心创建增量表

以下代码示例展示了一个增量表,该表会增量处理 productiondb.logs 表的行:

config { type: "incremental" }

SELECT timestamp, message FROM ${ref("productiondb", "logs")}

${when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

使用 ref 函数通过 Dataform 核心引用表

以下代码示例展示了用于引用 definitions/new_table_with_ref.sqlx 表定义文件中的 source_data 表的 ref 函数:

config { type: "table" }

SELECT * FROM ${ref("source_data")}

使用 Dataform 核心向表、视图或声明添加文档

以下代码示例展示了 definitions/documented_table.sqlx 表定义文件中的表和列说明:

config { type: "table",
         description: "This table is an example",
         columns:{
             user_name: "Name of the user",
             user_id: "ID of the user"
      }
  }

SELECT user_name, user_id FROM ${ref("source_data")}

配置增量表

使用 Dataform 核心为源数据中的新日期添加新表行

以下代码示例展示了 definitions/incremental_table.sqlx 文件中增量表的配置。在此配置中,Dataform 会针对每个新日期将新行附加到 incremental_table

config { type: "incremental" }

SELECT date(timestamp) AS date, action
FROM weblogs.user_actions

${ when(incremental(), `WHERE timestamp > (select max(date) FROM ${self()})`)

使用 Dataform 核心定期为表创建快照

以下代码示例展示了 definitions/snapshots_table.sqlx 文件中增量表的配置。在此配置中,Dataform 会创建 snapshots_table,其中包含指定日期的 productiondb.customers 快照:

config { type: "incremental" }

SELECT current_date() AS snapshot_date, customer_id, name, account_settings FROM productiondb.customers

${ when(incremental(), `WHERE snapshot_date > (SELECT max(snapshot_date) FROM ${self()})`) }

使用 Dataform 核心构建一个会使用增量更新的 30 天滚动表

以下代码示例展示了 definitions/incremental_example.sqlx 文件中增量表的配置。在此配置中,Dataform 会创建一个会增量更新的临时 incremental_example,并在表创建 30 天后将其删除:

config {type: "incremental"}

post_operations {
  delete FROM ${self()} WHERE date < (date_add(Day, -30, CURRENT_DATE))
}

SELECT
 date(timestamp) AS date,
 order_id,
FROM source_table
  ${ when(incremental(), `WHERE timestamp > (SELECT max(date) FROM ${self()})`) }

创建自定义 SQL 操作

使用 Dataform 核心在 SQLX 文件中运行多个 SQL 操作

以下代码示例展示了 ; 用于分隔 definitions/operations.sqlx 中定义的多个 SQL 操作:

config { type: "operations" }

DELETE FROM datatable where country = 'GB';
DELETE FROM datatable where country = 'FR';

在使用 Dataform 核心创建表之前运行自定义 SQL

以下代码示例展示了在 definitions/table_with_preops.sqlx 表定义文件的 pre_operations 块中定义的自定义 SQL 操作:

config {type: "table"}

SELECT * FROM ...

pre_operations {
  INSERT INTO table ...
}

使用 Dataform 核心创建表后运行自定义 SQL

以下代码示例展示了在 definitions/table_with_postops.sqlx 表定义文件的 post_operations 块中定义的自定义 SQL 操作:

config {type: "table"}

SELECT * FROM ...

post_operations {
  GRANT `roles/bigquery.dataViewer`
  ON
  TABLE ${self()}
  TO "group:allusers@example.com", "user:otheruser@example.com"
}

验证表

使用 Dataform 核心向表、视图或声明添加断言

以下代码示例展示了添加到 definitions/tested_table.sqlx 表定义文件中的 uniqueKeynonNullrowConditions 断言:

config {
  type: "table",
  assertions: {
    uniqueKey: ["user_id"],
    nonNull: ["user_id", "customer_id"],
    rowConditions: [
      'signup_date is null or signup_date > "2022-01-01"',
      'email like "%@%.%"'
    ]
  }
}
SELECT ...

使用 Dataform 核心添加自定义断言

以下代码示例展示了表定义文件中的自定义断言,用于验证 source_data 中的 abc 列是否为 null

config { type: "assertion" }

SELECT
  *
FROM
  ${ref("source_data")}
WHERE
  a is null
  or b is null
  or c is null

使用 JavaScript 进行开发

在 JavaScript 中使用内嵌变量和函数

以下代码示例展示了在 js 块中定义的 foo 变量,以及在 SQLX 文件中内嵌使用该变量:

js {
 const foo = 1;
 function bar(number){
     return number+1;
 }
}

SELECT
 ${foo} AS one,
 ${bar(foo)} AS two

使用 JavaScript 为每个国家/地区生成一个表格

以下代码示例展示了如何使用 forEach 函数,针对 definitions/one_table_per_country.js 文件中 countries 中定义的每个国家/地区生成一个表格:

const countries = ["GB", "US", "FR", "TH", "NG"];

countries.forEach(country => {
  publish("reporting_" + country)
    .dependencies(["source_table"])
    .query(
      ctx => `
      SELECT '${country}' AS country
      `
    );
});

使用 JavaScript 在一个文件中声明多个来源

以下代码示例展示了如何在 definitions/external_dependencies.js 文件中声明多个数据源:

declare({
  schema: "stripe",
  name: "charges"
});

declare({
  schema: "shopify",
  name: "orders"
});

declare({
  schema: "salesforce",
  name: "accounts"
});

使用 forEach 在一个文件中声明多个来源

以下代码示例展示了如何在 definitions/external_dependencies.js 文件中使用 forEach 函数声明多个数据源:

["charges", "subscriptions", "line_items", "invoices"]
  .forEach(source => declare({
      schema: "stripe",
      name: source
    })
  );

使用 JavaScript 删除包含个人身份信息的所有表中的敏感信息

以下代码示例展示了 definitions/delete_pii.js 文件中的一个函数,该函数会删除包含个人身份信息 (PII) 的所有表中的所选信息:

const pii_tables = ["users", "customers", "leads"];
pii_tables.forEach(table =>
  operate(`gdpr_cleanup: ${table}`,
    ctx => `
      DELETE FROM raw_data.${table}
      WHERE user_id in (SELECT * FROM users_who_requested_deletion)`)
      .tags(["gdpr_deletion"]))
);

使用 JavaScript 添加 preOpspostOps

以下代码示例展示了用于在 definitions/pre_and_post_ops_example.js 表中创建包含 preOpspostOps 的查询的 publish 函数:

publish("example")
  .preOps(ctx => `GRANT \`roles/bigquery.dataViewer\` ON TABLE ${ctx.ref("other_table")} TO "group:automation@example.com"`)
  .query(ctx => `SELECT * FROM ${ctx.ref("other_table")}`)
  .postOps(ctx => `REVOKE \`roles/bigquery.dataViewer\` ON TABLE ${ctx.ref("other_table")} TO "group:automation@example.com"`)

使用 JavaScript 创建增量表

以下代码示例展示了用于在 definitions/incremental_example.js 文件中创建增量表的 publish 函数:

publish("incremental_example", {
  type: "incremental"
}).query(ctx => `
  SELECT * FROM ${ctx.ref("other_table")}
  ${ctx.when(ctx.incremental(),`WHERE timestamp > (SELECT MAX(date) FROM ${ctx.self()}`)}
`)

使用 JavaScript 回填每日表

以下代码示例展示了如何在 definitions/backfill_daily_data.js 文件中回填每天更新的表:

var getDateArray = function(start, end) {
  var startDate = new Date(start); //YYYY-MM-DD
  var endDate = new Date(end); //YYYY-MM-DD

  var arr = new Array();
  var dt = new Date(startDate);
  while (dt <= endDate) {
    arr.push(new Date(dt).toISOString().split("T")[0]);
    dt.setDate(dt.getDate() + 1);
  }
  return arr;
};

var dateArr = getDateArray("2020-03-01", "2020-04-01");

// step 1: create table
operate(`create table`, 'create table if not exists backfill_table (`fields`) `);
// step 2: insert into the table

dateArr.forEach((day, i) =>
  operate(`backfill ${day}`
   `insert into backfill_table select fields where day = '${day}'`)
);

使用包含文件重复使用代码

将全局变量与 JavaScript 搭配使用

以下代码示例展示了 includes/constants.js 中的 project_idfirst_date 常量的定义:

const project_id = "project_id";
const first_date = "'1970-01-01'";
module.exports = {
  project_id,
  first_date
};

以下代码示例展示了 definitions/new_table.sqlx 文件中引用的 first_date 常量:

config {type: "table"}

SELECT * FROM source_table WHERE date > ${constants.first_date}

使用 JavaScript 创建国家/地区映射

以下代码示例展示了 includes/mapping.js 文件中定义的 country_group 自定义函数:

function country_group(country){
  return `
  case
    when ${country} in ('US', 'CA') then 'NA'
    when ${country} in ('GB', 'FR', 'DE', 'IT', 'PL', 'SE') then 'EU'
    when ${country} in ('AU') then ${country}
    else 'Other'
  end`;
}

module.exports = {
   country_group
};

以下代码示例展示了在 definitions/new_table.sqlx 表定义文件中使用 country_group 函数的表定义:

config { type: "table"}

SELECT
  country AS country,
  ${mapping.country_group("country")} AS country_group,
  device_type AS device_type,
  sum(revenue) AS revenue,
  sum(pageviews) AS pageviews,
  sum(sessions) AS sessions

FROM ${ref("source_table")}

GROUP BY 1, 2, 3

以下代码示例展示了编译为 SQL 的 definitions/new_table.sqlx 中定义的查询:

SELECT
  country AS country,
  case
    when country in ('US', 'CA') then 'NA'
    when country in ('GB', 'FR', 'DE', 'IT', 'PL', 'SE') then 'EU'
    when country in ('AU') then country
    else 'Other'
  end AS country_group,
  device_type AS device_type,
  sum(revenue) AS revenue,
  sum(pageviews) AS pageviews,
  sum(sessions) AS sessions

FROM "dataform"."source_table"

GROUP BY 1, 2, 3

使用自定义 JavaScript 函数生成 SQL 脚本

以下代码示例展示了 includes/script_builder.js 中定义的 render_script 自定义函数:

function render_script(table, dimensions, metrics) {
  return `
      SELECT
      ${dimensions.map(field => `${field} AS ${field}`).join(",")},
      ${metrics.map(field => `sum(${field}) AS ${field}`).join(",\n")}
      FROM ${table}
      GROUP BY ${dimensions.map((field, i) => `${i + 1}`).join(", ")}
    `;
}

module.exports = { render_script };

以下代码示例展示了在 definitions/new_table.sqlx 表定义文件中使用 render_script 函数的表定义:

config {
    type: "table",
    tags: ["advanced", "hourly"],
    disabled: true
}

${script_builder.render_script(ref("source_table"),
                               ["country", "device_type"],
                               ["revenue", "pageviews", "sessions"]
                               )}

以下代码示例展示了编译为 SQL 的 definitions/new_table.sqlx 中定义的查询:

SELECT
  country AS country,
  device_type AS device_type,
  sum(revenue) AS revenue,
  sum(pageviews) AS pageviews,
  sum(sessions) AS sessions

FROM "dataform"."source_table"

GROUP BY 1, 2

操作配置

加载包含操作配置的 SQL 文件

操作配置有助于加载纯 SQL 文件。您可以在 definitions 文件夹中的 actions.yaml 文件中定义操作配置。

如需详细了解可用的操作类型和有效的操作配置选项,请参阅 Dataform 配置参考

以下代码示例展示了 definitions/actions.yaml 文件中名为 new_view 的视图的定义:

actions:
  - view:
    filename: new_view.sql

上例代码中引用的 definitions/new_view.sql SQL 文件包含纯 SQL:

SELECT * FROM source_data