设计和创建可重复使用的流水线


本教程介绍如何构建可重复使用的流水线,以从 Cloud Storage 读取数据、执行数据质量检查以及将数据写入 Cloud Storage。

可重复使用的流水线具有常规流水线结构,但您可以根据 HTTP 服务器提供的配置更改每个流水线节点的配置。例如,静态流水线可能会从 Cloud Storage 读取数据、应用转换,并向 BigQuery 输出表写入数据。如果您希望转换和 BigQuery 输出表根据流水线读取的 Cloud Storage 文件而发生变化,则需要创建可重复使用的流水线。

目标

  • 使用 Cloud Storage 参数 Setter 插件可允许流水线在每次运行时读取不同的输入。
  • 使用 Cloud Storage Argument Setter 插件可允许流水线在每次运行时执行不同的质量检查。
  • 将每次运行产生的输出数据写入 Cloud Storage。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

  • Cloud Data Fusion
  • Cloud Storage

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. 启用 Cloud Data Fusion, Cloud Storage, BigQuery, and Dataproc API。

    启用 API

  7. 创建 Cloud Data Fusion 实例

使用 Cloud Data Fusion 时,您可以同时使用 Google Cloud 控制台和单独的 Cloud Data Fusion 网页界面。在 Google Cloud 控制台中,您可以创建 Google Cloud 控制台项目,以及创建和删除 Cloud Data Fusion 实例。在 Cloud Data Fusion 网页界面中,您可以使用各种页面(例如 Pipeline StudioWrangler)来使用 Cloud Data Fusion 功能。

  1. 在 Google Cloud 控制台中,打开实例页面。

    打开“实例”页面

  2. 在实例的操作列中,点击查看实例链接。Cloud Data Fusion 网页界面会在新的浏览器标签页中打开。

部署 Cloud Storage 参数 Setter 插件

  1. 在 Cloud Data Fusion 网页界面中,转到 Studio 页面。

  2. 操作菜单中,点击 GCS 参数设置器

从 Cloud Storage 读取数据

  1. 在 Cloud Data Fusion 网页界面中,转到 Studio 页面。
  2. 点击 来源,然后选择 Cloud Storage。Cloud Storage 来源的节点显示在流水线中。
  3. 在 Cloud Storage 节点上,点击属性

  4. 参考名称字段中,输入名称。

  5. 路径字段中,输入 ${input.path}。此宏用于控制将在不同流水线运行中使用的 Cloud Storage 输入路径。

  6. 在右侧的“输出架构”面板中,点击偏移字段行中的回收站图标,从输出架构中移除 offset 字段。

  7. 点击验证并解决所有错误。

  8. 点击 以退出 Properties 对话框。

转换数据

  1. 在 Cloud Data Fusion 网页界面中,转到 Studio 页面上的数据流水线。
  2. 转换下拉菜单 中,选择 Wrangler
  3. 在 Pipeline Studio 画布中,将箭头从 Cloud Storage 节点拖动到 Wrangler 节点。
  4. 转到流水线中的 Wrangler 节点,然后点击属性
  5. 输入字段名称中,输入 body
  6. 配方字段中,输入 ${directives}。此宏用于控制将在不同流水线运行中使用的转换逻辑。
  7. 点击验证并解决所有错误。
  8. 点击 以退出 Properties 对话框。

写入 Cloud Storage

  1. 在 Cloud Data Fusion 网页界面中,转到 Studio 页面上的数据流水线。
  2. 接收器下拉菜单 中,选择 Cloud Storage。
  3. 在 Pipeline Studio 画布中,将箭头从 Wrangler 节点拖动到您刚添加的 Cloud Storage 节点。
  4. 转到流水线中的 Cloud Storage 接收器节点,然后点击属性
  5. 参考名称字段中,输入名称。
  6. 路径字段中,输入您项目中的 Cloud Storage 存储桶的路径,流水线可在其中写入输出文件。如果您没有 Cloud Storage 存储桶,请创建一个 Cloud Storage 存储桶
  7. 点击验证并解决所有错误。
  8. 点击 以退出 Properties 对话框。

设置宏参数

  1. 在 Cloud Data Fusion 网页界面中,转到 Studio 页面上的数据流水线。
  2. 条件和操作下拉菜单中,点击 GCS 参数设置器
  3. 在 Pipeline Studio 画布中,将箭头从 Cloud Storage Argument Setter 节点拖动到 Cloud Storage 节点。
  4. 转到流水线中的 Cloud Storage 参数 Setter 节点,然后点击属性
  5. 网址 字段中,输入以下网址:

    gs://reusable-pipeline-tutorial/args.json
    

    该网址对应于 Cloud Storage 中可公开访问的对象,对象包含以下内容:

    {
      "arguments" : [
        {
          "name": "input.path",
          "value": "gs://reusable-pipeline-tutorial/user-emails.txt"
        },
        {
          "name": "directives",
          "value": "send-to-error !dq:isEmail(body)"
        }
      ]
    }
    

    两个参数中的第一个是 input.path 的值。路径 gs://reusable-pipeline-tutorial/user-emails.txt 是 Cloud Storage 中可公开访问的对象,对象包含以下测试数据:

    alice@example.com
    bob@example.com
    craig@invalid@example.com
    

    第二个参数是 directives 的值。值 send-to-error !dq:isEmail(body) 会配置 Wrangler,以滤除不是有效电子邮件地址的所有行。例如,craig@invalid@example.com 已被滤除。

  6. 点击验证以确保没有任何错误。

  7. 点击 以退出 Properties 对话框。

部署并运行流水线

  1. 流水线 Studio 页面的顶部栏中,点击为流水线命名。 为流水线命名,然后点击保存

  2. 点击部署

  3. 如需打开运行时参数并查看宏(运行时)input.pathdirectives 参数,请点击运行旁边的 下拉菜单。

    将值字段留空,以通知 Cloud Data Fusion 流水线中的 Cloud Storage 参数 setter 节点在运行时设置这些参数的值。

  4. 点击运行

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

学完本教程后,请清理在 Google Cloud 上创建的资源,以避免这些资源占用配额,日后产生费用。以下部分介绍如何删除或关闭这些资源。

删除 Cloud Data Fusion 实例

按照说明删除 Cloud Data Fusion 实例

删除项目

若要避免产生费用,最简单的方法是删除您为本教程创建的项目。

如需删除项目,请执行以下操作:

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

后续步骤