创建可重复使用的流水线

本教程介绍如何构建可重复使用的流水线,以从 Cloud Storage 读取数据、执行数据质量检查以及将数据写入 Cloud Storage。

可重复使用的流水线具有常规流水线结构,但您可以根据 HTTP 服务器提供的配置更改每个流水线节点的配置。例如,静态流水线可能会从 Cloud Storage 读取数据、应用转换,并向 BigQuery 输出表写入数据。如果您希望转换和 BigQuery 输出表根据流水线读取的 Cloud Storage 文件发生更改,则需要创建可重复使用的流水线。

目标

  • 使用 Argument Setter 插件让流水线在每次运行时读取不同的输入。
  • 使用 Argument Setter 插件让流水线在每次运行时执行不同的质量检查。
  • 将每次运行产生的输出数据写入 Cloud Storage。

费用

本教程使用 Google Cloud 的以下收费组件:

  • Cloud Data Fusion
  • Cloud Storage

请使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

  1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 启用 Cloud Data Fusion, Cloud Storage, and Cloud Dataproc API。

    启用 API

  5. 创建 Cloud Data Fusion 实例

使用 Cloud Data Fusion 时,您将同时使用 Cloud Console 和单独的 Cloud Data Fusion 界面。在 Cloud Console 中,您可以创建 Cloud Console 项目,以及创建和删除 Cloud Data Fusion 实例。在 Cloud Data Fusion 界面中,您可以通过各种页面(例如流水线 StudioWrangler)来使用 Cloud Data Fusion 功能。

  1. 在 Cloud Console 中,打开实例页面。

    打开“实例”页面

  2. 在实例的操作列中,点击查看实例链接。Cloud Data Fusion 界面将在新的浏览器标签页中打开。

部署 Argument Setter 插件

  1. 在 Cloud Data Fusion 网页界面的右上角,点击 Hub

  2. 点击 Argument Setter 操作插件,然后点击部署

  3. 在打开的部署窗口中,点击完成

  4. 点击创建流水线。这时会打开流水线 Studio 页面。

从 Cloud Storage 读取数据

  1. 流水线 Studio 页面的左侧面板中,从来源下拉菜单 中选择 Google Cloud Storage
  2. 将鼠标指针放在 Cloud Storage 源卡上,然后点击显示的属性按钮。
  3. 参考名称字段中,输入名称。
  4. 路径字段中,输入 ${input.path}。此宏用于控制将在不同流水线运行中使用的 Cloud Storage 输入路径。
  5. 在右侧的“输出架构”面板中,通过点击偏移字段行中的垃圾桶图标,从输出架构中移除偏移字段。
  6. 点击验证以确保没有任何错误。
  7. 点击 X 按钮以退出“属性”对话框。

转换数据

  1. 流水线 Studio 页面的左侧面板中,从转换下拉菜单 中选择 Wrangler
  2. 在“流水线 Studio”画布中,将箭头从 Cloud Storage 卡拖动到 Wrangler 卡。
  3. 将鼠标指针放在 Wrangler 卡上,然后点击显示的属性按钮。
  4. 输入字段名称中,输入 body
  5. 配方字段中,输入 ${directives}。此宏用于控制将在不同流水线运行中使用的转换逻辑。
  6. 点击验证以确保没有任何错误。
  7. 点击 X 按钮以退出“属性”对话框。

写入 Cloud Storage

  1. 流水线 Studio 页面的左侧面板中,从接收器下拉菜单 中选择“Cloud Storage”。
  2. 在“流水线 Studio”画布中,将箭头从 Wrangler 卡拖动到您刚刚添加的 Cloud Storage 卡。
  3. 将鼠标指针放在 Cloud Storage 接收器卡上,然后点击显示的属性按钮。
  4. 参考名称字段中,输入名称。
  5. 路径字段中,输入您项目中的 Cloud Storage 存储分区的路径,流水线可在其中写入输出文件。如果您没有 Cloud Storage 存储分区,请创建一个 Cloud Storage 存储分区
  6. 点击验证以确保没有任何错误。
  7. 点击 X 按钮退出“属性”菜单。

设置宏参数

  1. 流水线 Studio 页面的左侧面板中,从条件和操作下拉菜单 中选择 Argument Setter 插件。
  2. 在“流水线 Studio”画布中,将箭头从 Argument Setter 卡拖动到 Cloud Storage 源卡。
  3. 将鼠标指针放在 Argument Setter 卡上,然后点击显示的属性按钮。
  4. 网址字段中,输入以下内容。

    https://storage.googleapis.com/reusable-pipeline-tutorial/args.json
    

    该网址对应于 Cloud Storage 中可公开访问的对象,对象包含以下内容:

    {
      "arguments" : [
        {
          "name": "input.path",
          "value": "gs://reusable-pipeline-tutorial/user-emails.txt"
        },
        {
          "name": "directives",
          "value": "send-to-error !dq:isEmail(body)"
        }
      ]
    }
    

    两个参数中的第一个是 input.path 的值。路径 gs://reusable-pipeline-tutorial/user-emails.txt 是 Cloud Storage 中可公开访问的对象,对象包含以下测试数据:

    alice@example.com
    bob@example.com
    craig@invalid@example.com
    

    第二个参数是 directives 的值。值 send-to-error !dq:isEmail(body) 会配置 Wrangler,以滤除不是有效电子邮件地址的所有行。例如,craig@invalid@example.com 已被滤除。

  5. 点击验证以确保没有任何错误。

  6. 点击 X 按钮退出“属性”菜单。

部署并运行流水线

  1. 流水线 Studio 页面的顶部栏中,点击为流水线命名。 为流水线命名,然后点击保存
  2. 点击部署
  3. 点击运行旁边的下拉菜单 以打开运行时参数,并查看宏(运行时)input.pathdirectives 参数。请将值字段留空,以通知 Cloud Data Fusion 流水线中的 Argument Setter 节点将在运行时期间设置这些参数的值。
  4. 点击运行

清除数据

为避免因本教程中使用的资源导致您的 Google Cloud 帐号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

学完本教程后,请清理在 Google Cloud 上创建的资源,以避免这些资源占用配额,日后产生费用。以下部分介绍如何删除或关闭这些资源。

删除 Cloud Data Fusion 实例

按照说明删除 Cloud Data Fusion 实例

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

如需删除项目,请执行以下操作:

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

后续步骤