Dataform 核心示例脚本

本文档展示了 Dataform 核心和 JavaScript 脚本示例, 可用于在 Dataform 中创建 SQL 工作流。

创建表

创建包含 Dataform Core 的视图

以下代码示例展示了名为 new_view 的视图的定义 在 definitions/new_view.sqlx 文件中:

config { type: "view" }

SELECT * FROM source_data

使用 Dataform 核心创建具体化视图

以下代码示例展示了名为 definitions/new_materialized_view.sqlx 文件中的 new_materialized_view

config {
  type: "view",
  materialized: true
}

SELECT * FROM source_data

创建包含 Dataform 核心的表

以下代码示例展示了名为 new_table 的表的定义 在 definitions/new_table.sqlx 文件中:

config { type: "table" }

SELECT * FROM source_data

使用 Dataform 核心创建增量表

以下代码示例展示了一个增量表 以增量方式处理 productiondb.logs 表中的行:

config { type: "incremental" }

SELECT timestamp, message FROM ${ref("productiondb", "logs")}

${when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

使用 ref 函数通过 Dataform 核心引用表

以下代码示例显示了用于引用ref definitions/new_table_with_ref.sqlx 中的 source_data 个表 表定义文件:

config { type: "table" }

SELECT * FROM ${ref("source_data")}

使用 Dataform Core 向表、视图或声明添加文档

以下代码示例显示了表和列的说明 在 definitions/documented_table.sqlx 表定义文件中:

config { type: "table",
         description: "This table is an example",
         columns:{
             user_name: "Name of the user",
             user_id: "ID of the user"
      }
  }

SELECT user_name, user_id FROM ${ref("source_data")}

配置增量表

使用 Dataform 核心为源数据中的新日期添加新的表格行

以下代码示例展示了增量表的配置 位于 definitions/incremental_table.sqlx 文件中。在此配置中 Dataform 会在每个新日期的 incremental_table 中附加一个新行:

config { type: "incremental" }

SELECT date(timestamp) AS date, action
FROM weblogs.user_actions

${ when(incremental(), `WHERE timestamp > (select max(date) FROM ${self()})`)

使用 Dataform 核心定期截取表的快照

以下代码示例展示了 definitions/snapshots_table.sqlx 文件。在此配置中 Dataform 使用以下快照创建 snapshots_table: 在指定日期productiondb.customers

config { type: "incremental" }

SELECT current_date() AS snapshot_date, customer_id, name, account_settings FROM productiondb.customers

${ when(incremental(), `WHERE snapshot_date > (SELECT max(snapshot_date) FROM ${self()})`) }

构建一个使用 Dataform 核心以增量更新的 30 天滚动表

以下代码示例展示了 definitions/incremental_example.sqlx 文件。在此配置中 Dataform 会创建一个用于更新的临时 incremental_example 并在创建 30 天后删除表:

config {type: "incremental"}

post_operations {
  delete FROM ${self()} WHERE date < (date_add(Day, -30, CURRENT_DATE))
}

SELECT
 date(timestamp) AS date,
 order_id,
FROM source_table
  ${ when(incremental(), `WHERE timestamp > (SELECT max(date) FROM ${self()})`) }

创建自定义 SQL 操作

使用 Dataform Core 在 SQLX 文件中运行多项 SQL 操作

以下代码示例展示了用于分隔多个 SQL 操作的 ;definitions/operations.sqlx 中定义:

config { type: "operations" }

DELETE FROM datatable where country = 'GB';
DELETE FROM datatable where country = 'FR';

在使用 Dataform 核心创建表之前运行自定义 SQL

以下代码示例展示了 definitions/table_with_preops.sqlxpre_operations 块 表定义文件:

config {type: "table"}

SELECT * FROM ...

pre_operations {
  INSERT INTO table ...
}

使用 Dataform 核心创建表后运行自定义 SQL

以下代码示例展示了 definitions/table_with_postops.sqlxpost_operations 块 表定义文件:

config {type: "table"}

SELECT * FROM ...

post_operations {
  GRANT `roles/bigquery.dataViewer`
  ON
  TABLE ${self()}
  TO "group:allusers@example.com", "user:otheruser@example.com"
}

验证表

使用 Dataform 核心向表、视图或声明添加断言

以下代码示例显示了 uniqueKeynonNullrowConditions 断言添加到了 definitions/tested_table.sqlx 表定义文件:

config {
  type: "table",
  assertions: {
    uniqueKey: ["user_id"],
    nonNull: ["user_id", "customer_id"],
    rowConditions: [
      'signup_date is null or signup_date > "2022-01-01"',
      'email like "%@%.%"'
    ]
  }
}
SELECT ...

使用 Dataform 核心添加自定义断言

以下代码示例展示了表定义文件中的自定义断言 验证 source_data 中的 a 列、b 列或 c 列是否为 null

config { type: "assertion" }

SELECT
  *
FROM
  ${ref("source_data")}
WHERE
  a is null
  or b is null
  or c is null

使用 JavaScript 进行开发

在 JavaScript 中使用内联变量和函数

以下代码示例显示了 js 块中定义的 foo 变量 然后在 SQLX 文件中以内嵌方式使用:

js {
 const foo = 1;
 function bar(number){
     return number+1;
 }
}

SELECT
 ${foo} AS one,
 ${bar(foo)} AS two

使用 JavaScript 为每个国家/地区生成一个表格

以下代码示例展示了如何使用 forEach 函数生成 表countries definitions/one_table_per_country.js 文件:

const countries = ["GB", "US", "FR", "TH", "NG"];

countries.forEach(country => {
  publish("reporting_" + country)
    .dependencies(["source_table"])
    .query(
      ctx => `
      SELECT '${country}' AS country
      `
    );
});

使用 JavaScript 在一个文件中声明多个来源

以下代码示例显示了在 definitions/external_dependencies.js 文件:

declare({
  schema: "stripe",
  name: "charges"
});

declare({
  schema: "shopify",
  name: "orders"
});

declare({
  schema: "salesforce",
  name: "accounts"
});

使用 forEach 在一个文件中声明多个来源

以下代码示例显示了使用 definitions/external_dependencies.js 文件中的 forEach 函数:

["charges", "subscriptions", "line_items", "invoices"]
  .forEach(source => declare({
      schema: "stripe",
      name: source
    })
  );

使用 JavaScript 删除所有包含个人身份信息的表中的敏感信息

以下代码示例显示了 definitions/delete_pii.js 中的一个函数 该文件用于删除所有表中的选定信息,其中包含 个人身份信息 (PII):

const pii_tables = ["users", "customers", "leads"];
pii_tables.forEach(table =>
  operate(`gdpr_cleanup: ${table}`,
    ctx => `
      DELETE FROM raw_data.${table}
      WHERE user_id in (SELECT * FROM users_who_requested_deletion)`)
      .tags(["gdpr_deletion"]))
);

使用 JavaScript 添加 preOpspostOps

以下代码示例显示了用于创建查询的 publish 函数 preOpspostOpsdefinitions/pre_and_post_ops_example.js 表:

publish("example")
  .preOps(ctx => `GRANT \`roles/bigquery.dataViewer\` ON TABLE ${ctx.ref("other_table")} TO "group:automation@example.com"`)
  .query(ctx => `SELECT * FROM ${ctx.ref("other_table")}`)
  .postOps(ctx => `REVOKE \`roles/bigquery.dataViewer\` ON TABLE ${ctx.ref("other_table")} TO "group:automation@example.com"`)

使用 JavaScript 创建增量表

以下代码示例显示了用于创建publish definitions/incremental_example.js 文件中的增量表:

publish("incremental_example", {
  type: "incremental"
}).query(ctx => `
  SELECT * FROM ${ctx.ref("other_table")}
  ${ctx.when(ctx.incremental(),`WHERE timestamp > (SELECT MAX(date) FROM ${ctx.self()}`)}
`)

使用 JavaScript 回填每日表

以下代码示例展示了如何回填在 definitions/backfill_daily_data.js 文件:

var getDateArray = function(start, end) {
  var startDate = new Date(start); //YYYY-MM-DD
  var endDate = new Date(end); //YYYY-MM-DD

  var arr = new Array();
  var dt = new Date(startDate);
  while (dt <= endDate) {
    arr.push(new Date(dt).toISOString().split("T")[0]);
    dt.setDate(dt.getDate() + 1);
  }
  return arr;
};

var dateArr = getDateArray("2020-03-01", "2020-04-01");

// step 1: create table
operate(`create table`, 'create table if not exists backfill_table (`fields`) `);
// step 2: insert into the table

dateArr.forEach((day, i) =>
  operate(`backfill ${day}`
   `insert into backfill_table select fields where day = '${day}'`)
);

通过 include 重复使用代码

在 JavaScript 中使用全局变量

以下代码示例展示了 project_idfirst_date 的定义 includes/constants.js 中的常量:

const project_id = "project_id";
const first_date = "'1970-01-01'";
module.exports = {
  project_id,
  first_date
};

以下代码示例显示了first_date definitions/new_table.sqlx 文件:

config {type: "table"}

SELECT * FROM source_table WHERE date > ${constants.first_date}

使用 JavaScript 创建国家/地区映射

以下代码示例显示了 country_group 自定义函数(在 includes/mapping.js 文件:

function country_group(country){
  return `
  case
    when ${country} in ('US', 'CA') then 'NA'
    when ${country} in ('GB', 'FR', 'DE', 'IT', 'PL', 'SE') then 'EU'
    when ${country} in ('AU') then ${country}
    else 'Other'
  end`;
}

module.exports = {
   country_group
};

以下代码示例展示了一个使用 definitions/new_table.sqlx 中的 country_group 函数 表定义文件:

config { type: "table"}

SELECT
  country AS country,
  ${mapping.country_group("country")} AS country_group,
  device_type AS device_type,
  sum(revenue) AS revenue,
  sum(pageviews) AS pageviews,
  sum(sessions) AS sessions

FROM ${ref("source_table")}

GROUP BY 1, 2, 3

以下代码示例显示了在 编译为 SQL 的 definitions/new_table.sqlx

SELECT
  country AS country,
  case
    when country in ('US', 'CA') then 'NA'
    when country in ('GB', 'FR', 'DE', 'IT', 'PL', 'SE') then 'EU'
    when country in ('AU') then country
    else 'Other'
  end AS country_group,
  device_type AS device_type,
  sum(revenue) AS revenue,
  sum(pageviews) AS pageviews,
  sum(sessions) AS sessions

FROM "dataform"."source_table"

GROUP BY 1, 2, 3

使用自定义 JavaScript 函数生成 SQL 脚本

以下代码示例显示了 render_script 自定义函数 在 includes/script_builder.js 中定义:

function render_script(table, dimensions, metrics) {
  return `
      SELECT
      ${dimensions.map(field => `${field} AS ${field}`).join(",")},
      ${metrics.map(field => `sum(${field}) AS ${field}`).join(",\n")}
      FROM ${table}
      GROUP BY ${dimensions.map((field, i) => `${i + 1}`).join(", ")}
    `;
}

module.exports = { render_script };

以下代码示例展示了一个使用 definitions/new_table.sqlx 中的 render_script 函数 表定义文件:

config {
    type: "table",
    tags: ["advanced", "hourly"],
    disabled: true
}

${script_builder.render_script(ref("source_table"),
                               ["country", "device_type"],
                               ["revenue", "pageviews", "sessions"]
                               )}

以下代码示例显示了在 编译为 SQL 的 definitions/new_table.sqlx

SELECT
  country AS country,
  device_type AS device_type,
  sum(revenue) AS revenue,
  sum(pageviews) AS pageviews,
  sum(sessions) AS sessions

FROM "dataform"."source_table"

GROUP BY 1, 2

操作配置

加载包含操作配置的 SQL 文件

操作配置有助于加载纯 SQL 文件。你可以定义操作配置 位于 definitions 文件夹的 actions.yaml 文件中。

详细了解可用的操作类型和有效的操作配置 请参阅 Dataform 配置 参考文档

以下代码示例显示了名为 new_view 的视图在 definitions/actions.yaml 文件:

actions:
  - view:
    filename: new_view.sql

上述代码引用的 definitions/new_view.sql SQL 文件 示例,包含纯 SQL:

SELECT * FROM source_data