本文档展示了 Dataform 核心和 JavaScript 脚本示例, 可用于在 Dataform 中创建 SQL 工作流。
创建表
创建包含 Dataform Core 的视图
以下代码示例展示了名为 new_view
的视图的定义
在 definitions/new_view.sqlx
文件中:
config { type: "view" }
SELECT * FROM source_data
使用 Dataform 核心创建具体化视图
以下代码示例展示了名为
definitions/new_materialized_view.sqlx
文件中的 new_materialized_view
:
config {
type: "view",
materialized: true
}
SELECT * FROM source_data
创建包含 Dataform 核心的表
以下代码示例展示了名为 new_table
的表的定义
在 definitions/new_table.sqlx
文件中:
config { type: "table" }
SELECT * FROM source_data
使用 Dataform 核心创建增量表
以下代码示例展示了一个增量表
以增量方式处理 productiondb.logs
表中的行:
config { type: "incremental" }
SELECT timestamp, message FROM ${ref("productiondb", "logs")}
${when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }
使用 ref
函数通过 Dataform 核心引用表
以下代码示例显示了用于引用ref
definitions/new_table_with_ref.sqlx
中的 source_data
个表
表定义文件:
config { type: "table" }
SELECT * FROM ${ref("source_data")}
使用 Dataform Core 向表、视图或声明添加文档
以下代码示例显示了表和列的说明
在 definitions/documented_table.sqlx
表定义文件中:
config { type: "table",
description: "This table is an example",
columns:{
user_name: "Name of the user",
user_id: "ID of the user"
}
}
SELECT user_name, user_id FROM ${ref("source_data")}
配置增量表
使用 Dataform 核心为源数据中的新日期添加新的表格行
以下代码示例展示了增量表的配置
位于 definitions/incremental_table.sqlx
文件中。在此配置中
Dataform 会在每个新日期的 incremental_table
中附加一个新行:
config { type: "incremental" }
SELECT date(timestamp) AS date, action
FROM weblogs.user_actions
${ when(incremental(), `WHERE timestamp > (select max(date) FROM ${self()})`)
使用 Dataform 核心定期截取表的快照
以下代码示例展示了
definitions/snapshots_table.sqlx
文件。在此配置中
Dataform 使用以下快照创建 snapshots_table
:
在指定日期productiondb.customers
:
config { type: "incremental" }
SELECT current_date() AS snapshot_date, customer_id, name, account_settings FROM productiondb.customers
${ when(incremental(), `WHERE snapshot_date > (SELECT max(snapshot_date) FROM ${self()})`) }
构建一个使用 Dataform 核心以增量更新的 30 天滚动表
以下代码示例展示了
definitions/incremental_example.sqlx
文件。在此配置中
Dataform 会创建一个用于更新的临时 incremental_example
并在创建 30 天后删除表:
config {type: "incremental"}
post_operations {
delete FROM ${self()} WHERE date < (date_add(Day, -30, CURRENT_DATE))
}
SELECT
date(timestamp) AS date,
order_id,
FROM source_table
${ when(incremental(), `WHERE timestamp > (SELECT max(date) FROM ${self()})`) }
创建自定义 SQL 操作
使用 Dataform Core 在 SQLX 文件中运行多项 SQL 操作
以下代码示例展示了用于分隔多个 SQL 操作的 ;
在 definitions/operations.sqlx
中定义:
config { type: "operations" }
DELETE FROM datatable where country = 'GB';
DELETE FROM datatable where country = 'FR';
在使用 Dataform 核心创建表之前运行自定义 SQL
以下代码示例展示了
definitions/table_with_preops.sqlx
的 pre_operations
块
表定义文件:
config {type: "table"}
SELECT * FROM ...
pre_operations {
INSERT INTO table ...
}
使用 Dataform 核心创建表后运行自定义 SQL
以下代码示例展示了
definitions/table_with_postops.sqlx
的 post_operations
块
表定义文件:
config {type: "table"}
SELECT * FROM ...
post_operations {
GRANT `roles/bigquery.dataViewer`
ON
TABLE ${self()}
TO "group:allusers@example.com", "user:otheruser@example.com"
}
验证表
使用 Dataform 核心向表、视图或声明添加断言
以下代码示例显示了 uniqueKey
、nonNull
和 rowConditions
断言添加到了 definitions/tested_table.sqlx
表定义文件:
config {
type: "table",
assertions: {
uniqueKey: ["user_id"],
nonNull: ["user_id", "customer_id"],
rowConditions: [
'signup_date is null or signup_date > "2022-01-01"',
'email like "%@%.%"'
]
}
}
SELECT ...
使用 Dataform 核心添加自定义断言
以下代码示例展示了表定义文件中的自定义断言
验证 source_data
中的 a
列、b
列或 c
列是否为 null
:
config { type: "assertion" }
SELECT
*
FROM
${ref("source_data")}
WHERE
a is null
or b is null
or c is null
使用 JavaScript 进行开发
在 JavaScript 中使用内联变量和函数
以下代码示例显示了 js
块中定义的 foo
变量
然后在 SQLX 文件中以内嵌方式使用:
js {
const foo = 1;
function bar(number){
return number+1;
}
}
SELECT
${foo} AS one,
${bar(foo)} AS two
使用 JavaScript 为每个国家/地区生成一个表格
以下代码示例展示了如何使用 forEach
函数生成
表countries
definitions/one_table_per_country.js
文件:
const countries = ["GB", "US", "FR", "TH", "NG"];
countries.forEach(country => {
publish("reporting_" + country)
.dependencies(["source_table"])
.query(
ctx => `
SELECT '${country}' AS country
`
);
});
使用 JavaScript 在一个文件中声明多个来源
以下代码示例显示了在
definitions/external_dependencies.js
文件:
declare({
schema: "stripe",
name: "charges"
});
declare({
schema: "shopify",
name: "orders"
});
declare({
schema: "salesforce",
name: "accounts"
});
使用 forEach
在一个文件中声明多个来源
以下代码示例显示了使用
definitions/external_dependencies.js
文件中的 forEach
函数:
["charges", "subscriptions", "line_items", "invoices"]
.forEach(source => declare({
schema: "stripe",
name: source
})
);
使用 JavaScript 删除所有包含个人身份信息的表中的敏感信息
以下代码示例显示了 definitions/delete_pii.js
中的一个函数
该文件用于删除所有表中的选定信息,其中包含
个人身份信息 (PII):
const pii_tables = ["users", "customers", "leads"];
pii_tables.forEach(table =>
operate(`gdpr_cleanup: ${table}`,
ctx => `
DELETE FROM raw_data.${table}
WHERE user_id in (SELECT * FROM users_who_requested_deletion)`)
.tags(["gdpr_deletion"]))
);
使用 JavaScript 添加 preOps
和 postOps
以下代码示例显示了用于创建查询的 publish
函数
preOps
和postOps
在
definitions/pre_and_post_ops_example.js
表:
publish("example")
.preOps(ctx => `GRANT \`roles/bigquery.dataViewer\` ON TABLE ${ctx.ref("other_table")} TO "group:automation@example.com"`)
.query(ctx => `SELECT * FROM ${ctx.ref("other_table")}`)
.postOps(ctx => `REVOKE \`roles/bigquery.dataViewer\` ON TABLE ${ctx.ref("other_table")} TO "group:automation@example.com"`)
使用 JavaScript 创建增量表
以下代码示例显示了用于创建publish
definitions/incremental_example.js
文件中的增量表:
publish("incremental_example", {
type: "incremental"
}).query(ctx => `
SELECT * FROM ${ctx.ref("other_table")}
${ctx.when(ctx.incremental(),`WHERE timestamp > (SELECT MAX(date) FROM ${ctx.self()}`)}
`)
使用 JavaScript 回填每日表
以下代码示例展示了如何回填在
definitions/backfill_daily_data.js
文件:
var getDateArray = function(start, end) {
var startDate = new Date(start); //YYYY-MM-DD
var endDate = new Date(end); //YYYY-MM-DD
var arr = new Array();
var dt = new Date(startDate);
while (dt <= endDate) {
arr.push(new Date(dt).toISOString().split("T")[0]);
dt.setDate(dt.getDate() + 1);
}
return arr;
};
var dateArr = getDateArray("2020-03-01", "2020-04-01");
// step 1: create table
operate(`create table`, 'create table if not exists backfill_table (`fields`) `);
// step 2: insert into the table
dateArr.forEach((day, i) =>
operate(`backfill ${day}`
`insert into backfill_table select fields where day = '${day}'`)
);
通过 include 重复使用代码
在 JavaScript 中使用全局变量
以下代码示例展示了 project_id
和 first_date
的定义
includes/constants.js
中的常量:
const project_id = "project_id";
const first_date = "'1970-01-01'";
module.exports = {
project_id,
first_date
};
以下代码示例显示了first_date
definitions/new_table.sqlx
文件:
config {type: "table"}
SELECT * FROM source_table WHERE date > ${constants.first_date}
使用 JavaScript 创建国家/地区映射
以下代码示例显示了 country_group
自定义函数(在
includes/mapping.js
文件:
function country_group(country){
return `
case
when ${country} in ('US', 'CA') then 'NA'
when ${country} in ('GB', 'FR', 'DE', 'IT', 'PL', 'SE') then 'EU'
when ${country} in ('AU') then ${country}
else 'Other'
end`;
}
module.exports = {
country_group
};
以下代码示例展示了一个使用
definitions/new_table.sqlx
中的 country_group
函数
表定义文件:
config { type: "table"}
SELECT
country AS country,
${mapping.country_group("country")} AS country_group,
device_type AS device_type,
sum(revenue) AS revenue,
sum(pageviews) AS pageviews,
sum(sessions) AS sessions
FROM ${ref("source_table")}
GROUP BY 1, 2, 3
以下代码示例显示了在
编译为 SQL 的 definitions/new_table.sqlx
:
SELECT
country AS country,
case
when country in ('US', 'CA') then 'NA'
when country in ('GB', 'FR', 'DE', 'IT', 'PL', 'SE') then 'EU'
when country in ('AU') then country
else 'Other'
end AS country_group,
device_type AS device_type,
sum(revenue) AS revenue,
sum(pageviews) AS pageviews,
sum(sessions) AS sessions
FROM "dataform"."source_table"
GROUP BY 1, 2, 3
使用自定义 JavaScript 函数生成 SQL 脚本
以下代码示例显示了 render_script
自定义函数
在 includes/script_builder.js
中定义:
function render_script(table, dimensions, metrics) {
return `
SELECT
${dimensions.map(field => `${field} AS ${field}`).join(",")},
${metrics.map(field => `sum(${field}) AS ${field}`).join(",\n")}
FROM ${table}
GROUP BY ${dimensions.map((field, i) => `${i + 1}`).join(", ")}
`;
}
module.exports = { render_script };
以下代码示例展示了一个使用
definitions/new_table.sqlx
中的 render_script
函数
表定义文件:
config {
type: "table",
tags: ["advanced", "hourly"],
disabled: true
}
${script_builder.render_script(ref("source_table"),
["country", "device_type"],
["revenue", "pageviews", "sessions"]
)}
以下代码示例显示了在
编译为 SQL 的 definitions/new_table.sqlx
:
SELECT
country AS country,
device_type AS device_type,
sum(revenue) AS revenue,
sum(pageviews) AS pageviews,
sum(sessions) AS sessions
FROM "dataform"."source_table"
GROUP BY 1, 2
操作配置
加载包含操作配置的 SQL 文件
操作配置有助于加载纯 SQL 文件。你可以定义操作配置
位于 definitions
文件夹的 actions.yaml
文件中。
详细了解可用的操作类型和有效的操作配置 请参阅 Dataform 配置 参考文档。
以下代码示例显示了名为 new_view
的视图在
definitions/actions.yaml
文件:
actions:
- view:
filename: new_view.sql
上述代码引用的 definitions/new_view.sql
SQL 文件
示例,包含纯 SQL:
SELECT * FROM source_data