本教程介绍如何构建可重复使用的流水线,以从 Cloud Storage 读取数据、执行数据质量检查以及将数据写入 Cloud Storage。
可重复使用的流水线具有常规流水线结构,但您可以根据 HTTP 服务器提供的配置更改每个流水线节点的配置。例如,静态流水线可能会从 Cloud Storage 读取数据、应用转换,并向 BigQuery 输出表写入数据。如果您希望 转换和 BigQuery 输出表,具体取决于 Cloud Storage 文件,那么您可以创建一个可重复使用的 流水线。
目标
- 使用 Cloud Storage Argument Setter 插件让流水线在每次运行时读取不同的输入。
- 使用 Cloud Storage Argument Setter 插件以允许流水线执行 每次运行执行不同的质量检查。
- 将每次运行产生的输出数据写入 Cloud Storage。
费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
- Cloud Data Fusion
- Cloud Storage
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Data Fusion, Cloud Storage, BigQuery, and Dataproc APIs.
- 创建 Cloud Data Fusion 实例。
导航到 Cloud Data Fusion 网页界面
使用 Cloud Data Fusion 时,您会同时使用 Google Cloud 控制台和 以及单独的 Cloud Data Fusion 网页界面。在 Google Cloud 控制台中 创建 Google Cloud 控制台项目、创建和删除 Cloud Data Fusion 实例。在 Cloud Data Fusion 网页界面中,您可以使用 Pipeline Studio 或 Wrangler 等各种页面,供系统使用 Cloud Data Fusion 特性。
在 Google Cloud 控制台中,打开实例页面。
在实例的操作列中,点击查看实例链接。Cloud Data Fusion 网页界面会在新的浏览器标签页中打开。
部署 Cloud Storage Argument Setter 插件
在 Cloud Data Fusion 网页界面中,转到 Studio 页面。
在操作菜单中,点击 GCS 参数设置者。
从 Cloud Storage 读取数据
- 在 Cloud Data Fusion 网页界面中,转到 Studio 页面。
- 点击 arrow_drop_down 来源,然后选择 Cloud Storage。Cloud Storage 来源的节点显示在流水线中。
在 Cloud Storage 节点上,点击属性。
在参考名称字段中,输入名称。
在路径字段中,输入
${input.path}
。此宏用于控制将在不同流水线运行中使用的 Cloud Storage 输入路径。在右侧的“输出架构”面板中,从输出中移除 offset 字段 请点击偏移字段行中的回收站图标。
点击验证并解决所有错误。
点击
退出属性对话框。
转换数据
- 在 Cloud Data Fusion 网页界面中,转到 Studio 页面上的数据流水线。
- 在转换下拉菜单 arrow_drop_down 中, 选择 Wrangler。
- 在“流水线 Studio”画布中,将箭头从 Cloud Storage 节点拖动到 Wrangler 节点。
- 前往流水线中的 Wrangler 节点,然后点击属性。
- 在输入字段名称中,输入
body
。 - 在配方字段中,输入
${directives}
。此宏用于控制将在不同流水线运行中使用的转换逻辑。 - 点击验证,并解决所有错误。
- 点击 以退出 Properties 对话框。
写入 Cloud Storage
- 在 Cloud Data Fusion 网页界面中,转到 Studio 页面上的数据流水线。
- 在接收器下拉菜单 arrow_drop_down 中, 选择 Cloud Storage
- 在 Pipeline Studio 画布中,将箭头从 Wrangler 节点拖动到 Cloud Storage 节点。
- 转到流水线中的 Cloud Storage 接收器节点,然后点击属性。
- 在参考名称字段中,输入名称。
- 在路径字段中,输入您项目中的 Cloud Storage 存储分区的路径,流水线可在其中写入输出文件。如果您没有 Cloud Storage 存储分区,请创建一个 Cloud Storage 存储分区。
- 点击验证,并解决所有错误。
- 点击 以退出 Properties 对话框。
设置宏参数
- 在 Cloud Data Fusion 网页界面中,转到 Studio 页面上的数据流水线。
- 在arrow_drop_down中 条件和操作下拉菜单中,点击 GCS 参数设置者。
- 在 Pipeline Studio 画布中,将箭头从 Cloud Storage Argument Setter 节点拖动到 Cloud Storage 来源节点。
- 转到流水线中的 Cloud Storage Argument Setter 节点,然后点击属性。
在网址字段中,输入以下网址:
gs://reusable-pipeline-tutorial/args.json
该网址对应于 Cloud Storage 中可公开访问的对象,对象包含以下内容:
{ "arguments" : [ { "name": "input.path", "value": "gs://reusable-pipeline-tutorial/user-emails.txt" }, { "name": "directives", "value": "send-to-error !dq:isEmail(body)" } ] }
两个参数中的第一个是
input.path
的值。路径gs://reusable-pipeline-tutorial/user-emails.txt
是 Cloud Storage 中可公开访问的对象,对象包含以下测试数据:alice@example.com bob@example.com craig@invalid@example.com
第二个参数是
directives
的值。值send-to-error !dq:isEmail(body)
会配置 Wrangler,以滤除不是有效电子邮件地址的所有行。例如,craig@invalid@example.com
已被滤除。点击验证以确保没有任何错误。
点击
以退出 Properties 对话框。
部署并运行流水线
在流水线 Studio 页面的顶部栏中,点击为流水线命名。 为流水线命名,然后点击保存。
点击部署。
打开 Runtime 参数并查看宏(运行时)
input.path
和directives
参数,请点击arrow_drop_down 下拉菜单(位于运行旁边)。将值字段留空,以通知 Cloud Data Fusion 流水线中的 Cloud Storage 参数 Setter 节点将设置这些参数的值 错误。
点击运行。
清除数据
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。
学完本教程后,请清理在 Google Cloud 上创建的资源,以避免这些资源占用配额,日后产生费用。以下部分介绍如何删除或关闭这些资源。
删除 Cloud Data Fusion 实例
删除项目
为了避免产生费用,最简单的方法是删除您为本教程创建的项目。
如需删除项目,请执行以下操作:
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.