使用 Data Pipelines

概览

您可以使用 Dataflow Data Pipelines 创建周期性的作业时间表,了解资源在多个作业执行中的消耗位置,定义和管理数据新鲜度目标,并深入了解各个流水线阶段以修复和优化流水线。

Data Pipeline 特性

  • 创建周期性的批量流水线,以按时间表运行批量作业。
  • 创建周期性的增量式批量流水线,以针对最新版本的输入数据运行批量作业。
  • 使用流水线信息摘要图表来查看流水线的总体容量用量和资源消耗量。
  • 查看流式流水线的数据新鲜度。此指标会随时间变化,可以与提醒相关联,当新鲜度低于指定目标时发出通知。
  • 使用流水线指标图表比较批量流水线作业并查找异常值。

数据流水线使用限制

  • 区域可用性:由于 Dataflow 数据流水线使用 App Engine 应用 Cloud Scheduler,因此可用的 App Engine 区域中提供数据流水线。

  • 配额限制:

    • 每个项目的流水线数量上限:500
    • 每个组织的流水线数量上限:2500

API 参考文档

如需了解 API 文档,请参阅 Data Pipelines 参考文档

数据流水线类型

Dataflow 数据流水线有两种类型:流式和批量。这两种流水线都运行在 Dataflow 模板中定义的作业。

流式数据流水线
流式数据流水线在创建后立即运行 Dataflow 流式作业。
批量数据流水线
批量数据流水线按照用户定义的时间表运行 Dataflow 批量作业。批量流水线输入文件名可以参数化,以允许进行增量式批量流水线处理

增量式批量流水线

您可以使用日期时间占位符来指定批量流水线的增量输入文件格式。

  • 可以使用年、月、日期、小时、分和秒的占位符,并且必须遵循 strftime() 格式。占位符带有百分比符号 (%) 前缀。
  • 创建流水线期间不会验证参数格式。
    • 示例:如果指定“gs://bucket/Y”作为参数化输入文件路径,则会被计算为“gs://bucket/Y”,因为没有“%”前缀的“Y”不会映射为 strftime() 格式。

在每个安排的批量流水线执行时,输入文件路径的占位符部分会计算为当前(或时间偏移)的日期时间(日期值使用计划作业所在时区的当前日期计算)。如果计算出的文件路径与输入文件的路径匹配,则批量流水线将在计划的时间提取该文件进行处理。

  • 示例:批量流水线计划在太平洋标准时间 (PST) 每小时开始时重复执行。如果您将输入文件路径参数化为 gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv,则在太平洋标准时间 2021 年 4 月 15 日下午 6 点,输入文件路径将计算为 gs://bucket-name/2021-04-15/prefix-18_00.csv

使用时移参数

您可以使用括在大括号中的 + 或 - 分钟或小时时移参数,格式为“{[+|-][0-9]+[m|h]}”,以支持将输入文件路径与计算出的日期时间进行匹配,该日期时间在流水线时间表的当前日期时间之前或之后发生偏移。批量流水线将继续按计划的时间重复,但输入文件路径将采用指定的时间偏移值进行计算。

  • 示例:批量流水线计划在太平洋标准时间 (PST) 每小时开始时重复执行。如果您将输入文件路径参数化为 gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h},则在太平洋标准时间 2021 年 4 月 15 日下午 6 点,输入文件路径将计算为 gs://bucket-name/2021-04-15/prefix-16_00.csv

数据流水线角色

为使数据流水线操作成功,必须授予用户必要的 IAM 角色,如下所示:

  1. 用户必须具有相应的角色才能执行操作:

  2. 用户必须被授予 Cloud Scheduler 和 Dataflow 使用的服务帐号的 roles/iam.serviceAccountUser 角色,以使用该服务帐号作为其身份进行操作。如果用户没有为 Cloud Scheduler 和 Dataflow 选择服务帐号,则使用默认 Compute Engine 服务帐号

创建数据流水线

您可以通过两种方式创建数据流水线:

  1. 导入作业,或者
  2. 创建数据流水线

数据流水线设置页面:首次访问 Cloud Console 中的 Dataflow 流水线功能时,系统会打开一个设置页面。

  1. 启用列出的 API
  2. 选择 Cloud Scheduler 将用于安排流水线的 App Engine 应用的区域。

导入作业

您可以导入基于经典或灵活模板的 Dataflow 批量或流式作业,并将其设置为数据流水线。

  1. 转到 Cloud Console 中的 Dataflow 作业页面,选择一个已完成的作业,然后在作业详情页面上选择“+作为流水线导入”。

  2. 基于模板创建流水线页面上,“数据流水线”流水线选项已被选中。其他参数将填充导入的作业的选项。

    1. 对于批量作业,请在“模板参数”下的“安排流水线”部分中提供重复时间表。可以选择提供用于安排批量运行的 Cloud Scheduler 账号电子邮件地址。如果未指定,则使用默认 Compute Engine 服务帐号。注意:用户必须被授予 Cloud Scheduler 使用的服务帐号的 roles/iam.serviceAccountUser 角色,无论是用户指定的服务账号还是默认的 Compute Engine 角色服务帐号(请参阅数据流水线角色)。

创建数据流水线

  1. 转到 Cloud Console 中的 Dataflow 流水线页面,然后选择“+创建数据流水线”。

  2. 基于模板创建流水线页面中的“作业管理”下,选择“数据流水线”,提供流水线名称,并填写其他选择模板和参数字段。

    1. 对于批量作业,请在“模板参数”下的“安排流水线”部分中提供重复时间表。可以选择提供用于安排批量运行的 Cloud Scheduler 账号电子邮件地址。如果未指定,则使用默认 Compute Engine 服务帐号。注意:用户必须被授予 Cloud Scheduler 使用的服务帐号的 roles/iam.serviceAccountUser 角色,无论是用户指定的服务账号还是默认的 Compute Engine 角色服务帐号(请参阅数据流水线角色)。

创建批量数据流水线

要创建此示例批量数据流水线,您必须拥有项目中以下资源的访问权限:

此示例流水线使用 Cloud Storage Text to BigQuery 批量流水线模板,此模板从 Cloud Storage 读取 CSV 格式的文件,运行转换,然后将值插入 your-project-id:your-dataset-name.three_column_table。

  1. 在本地驱动器上创建以下文件:
    1. 一个 bq_three_column_table.json 文件,其中包含目标 BigQuery 表的以下架构。
{
  "BigQuery Schema": [
    {
      "name": "col1",
      "type": "STRING"
    },
    {
      "name": "col2",
      "type": "STRING"
    },
    {
      "name": "col3",
      "type": "INT64"
    }
  ]
}
  1. 一个 split_csv_3cols.js JavaScript 文件,用于在插入 BigQuery 之前对输入数据进行简单的转换。
function transform(line) {
    var values = line.split(',');
    var obj = new Object();
    obj.col1 = values[0];
    obj.col2 = values[1];
    obj.col3 = values[2];
    var jsonString = JSON.stringify(obj);
    return jsonString;
}
  1. 一个 file01.csv CSV 文件,其中包含将插入到 BigQuery 表中的多条记录。
    b8e5087a,74,27531
    7a52c051,4a,25846
    672de80f,cd,76981
    111b92bf,2e,104653
    ff658424,f0,149364
    e6c17c75,84,38840
    833f5a69,8f,76892
    d8c833ff,7d,201386
    7d3da7fb,d5,81919
    3836d29b,70,181524
    ca66e6e5,d7,172076
    c8475eb6,03,247282
    558294df,f3,155392
    737b82a8,c7,235523
    82c8f5dc,35,468039
    57ab17f9,5e,480350
    cbcdaf84,bd,354127
    52b55391,eb,423078
    825b8863,62,88160
    26f16d4f,fd,397783
      
  2. 使用 gsutil 将文件复制到项目的 Cloud Storage 存储桶中的文件夹,如下所示:
    1. bq_three_column_table.jsonsplit_csv_3cols.js 复制到 gs://your-bucket/text_to_bigquery/
      gsutil cp bq_three_column_table.json gs://your-bucket/text_to_bigquery/
        gsutil cp split_csv_3cols.js gs://your-bucket/text_to_bigquery/
      
    2. file01.csv 复制到 gs://your-bucket/inputs/
      gsutil cp file01.csv gs://your-bucket/inputs/
      
  3. 通过 Cloud Storage 浏览器your-bucket 中创建一个“tmp”文件夹。选择您的文件夹名称以打开存储桶详情页面,然后点击“创建文件夹”,在存储桶中创建“tmp”文件夹。
  4. 转到 Dataflow 流水线页面,然后选择“创建数据流水线”。在基于模板创建流水线页面上,输入或选择以下各项:

    1. 作业管理:
      1. 选择“数据流水线”。
      2. 流水线名称:输入“text_to_bq_batch_data_pipeline”。
      3. 点击“继续”。
    2. 选择模板:
      1. 区域端点:选择一个 Compute Engine 区域
      2. 模板列表:在“Process Data in Bulk (batch)”下,选择“Text File on Cloud Storage to BigQuery”。说明:批量流水线。读取存储在 Cloud Storage 中的文本文件,使用 JavaScript 用户定义函数 (UDF) 转换这些文件,然后将结果输出到 BigQuery。”注意:请勿在“Process Data Continuously (stream)”下选择具有相同名称的流式流水线。
      3. 点击“继续”。
    3. 模板参数:
      1. 安排流水线:选择时间表,例如“每小时”的“第 25 分钟”,并选择您的时区。您可以在提交流水线后修改时间表,如下文所述。
    4. 必填参数:
      1. Cloud Storage 中的 JavaScript UDF 路径:
        gs://your-bucket/text_to_bigquery/split_csv_3cols.js
        
      2. JSON 路径:
        gs://your-bucket/text_to_bigquery/bq_three_column_table.json
        
      3. JavaScript UDF 名称:“transform”
      4. BigQuery 输出表(完全限定表名称):
        your_project_id:your_dataset.three_column_table
        
      5. Cloud Storage 输入路径:
        gs://your_bucket/inputs/file*.csv
        
      6. BigQuery 临时目录:
        gs://your_bucket/tmp
        
      7. 临时位置:
        gs://your_bucket/tmp
        
    5. 点击“提交”。
  5. 确认流水线和模板信息,并在流水线详情页面中查看当前和过去的历史记录。

您还可以使用 Dataflow Pipelines 控制台中的运行按钮按需运行批处理流水线。

创建示例流式数据流水线

您可以按照示例批量流水线说明来创建示例流式数据流水线,但有以下不同之处:

  • 流水线时间表。您不需要为流式数据流水线指定时间表。Dataflow 流式作业会立即启动。

  • 选择模板:在“Process Data Continuously (stream)”下,选择“Text File on Cloud Storage to BigQuery”。说明:此流式流水线可读取 Cloud Storage 中存储的文本文件,通过用户定义的 JavaScript 函数执行转换并将结果流式传输到 BigQuery。此流水线需要 JavaScript 函数和 BigQuery TableSchema 的 JSON 表示法。

  • 工作器机器类型:流水线将处理与 gs://<your_bucket>/inputs/file*.csv 模式匹配的初始文件集,以及您上传到 inputs/ 文件夹的匹配此模式的任何其他文件。如果 CSV 文件的大小超过数 GB,为避免出现内存不足错误,请选择内存高于默认 n1-standard-4 机器类型的机器类型,例如 n1-highmem-8

调查流水线目标违规

周期性的批量流水线

在 Cloud Console 的“流水线详情”页面上,使用流水线状态面板的“各个作业状态”和“每个步骤的线程时间”图表对流水线的健康状况进行初始分析。

调查示例

  1. 您有一个在每小时的第 3 分钟运行的周期性批量流水线,每个作业通常运行大约 9 分钟,您的目标是所有作业在 10 分钟内完成。

  2. “作业状态”图表显示作业运行时间超过 10 分钟。

  3. 在“更新/执行历史记录”表中,找到在相关时段运行的作业,然后点击进入 Dataflow 作业详情页面。在该页面上,找到运行时间较长的阶段,然后在日志中查找可能的错误,以确定延迟的原因。

流处理流水线

在 Cloud Console 的流水线详情页面上的“流水线信息”标签页下,使用流水线状态面板的数据新鲜度图表对流水线的健康状况进行初始分析。

调查示例

  1. 您有一个流式流水线,该流水线通常生成数据新鲜度为 20 秒的输出。

  2. 您设定的目标是保证 30 秒的数据新鲜度。查看数据新鲜度图表时,您注意到,在上午 9 点到 10 点之间,数据新鲜度提高到约 40 秒。

  3. 您可以切换到“流水线指标”标签页,然后查看吞吐量、CPU 利用率和内存利用率图表,以进行进一步分析。