使用 Dataflow 监控界面

使用 Dataflow 代管式服务运行流水线时,您可以通过 Dataflow 的 Web 监控界面查看该作业和其他任何作业。通过此监控界面,您可以查看相关 Dataflow 作业并与之交互。

您可以使用 Google Cloud Console 访问 Dataflow 监控界面。此监控界面可以显示以下内容:

  • 包含当前正在运行的所有 Dataflow 作业和过去 30 天内运行的作业的列表。
  • 每个流水线的图形表示。
  • 有关作业状态、类型和 SDK 版本的详情。
  • 运行流水线的 Google Cloud 服务(如 Compute EngineCloud Storage)相关信息的链接。
  • 作业执行期间出现的任何错误或警告。
  • 作业的其他诊断信息。

您可以在 Dataflow 监控界面中查看作业监控图表。这些图表显示流水线作业持续时间内的各种指标,并包含以下信息:

  • 步骤级可见性,可帮您确定哪些步骤会导致流水线延迟问题。
  • 可揭示出异常行为的统计信息。
  • 可帮助确定来源和接收器中的瓶颈问题的 I/O 指标。

访问 Dataflow 监控界面

如需访问 Dataflow 监控界面,请按以下步骤操作:

  1. 登录 Cloud Console。
  2. 选择您的 Google Cloud 项目。
  3. 点击左上角的菜单。
  4. 转到大数据部分并点击 Dataflow

此时系统将显示 Dataflow 作业及其状态的列表。如果您没有看到任何作业,则需要运行新作业。如需了解如何运行作业,请参阅 Dataflow 快速入门

Dataflow 作业列表,其中包含处于正在运行、失败和成功状态的作业。
图 1:Cloud Console 中的 Dataflow 作业列表,其中包含处于正在运行失败成功状态的作业。

作业可能具有以下状态:

  • :监控界面尚未从 Dataflow 服务收到状态。
  • 正在运行:作业正在运行。
  • 正在启动...:作业已创建,但系统在启动前需要一些时间进行准备。
  • 已加入队列:一个 FlexRS 作业已加入队列,或者正在启动 Flex 模板作业(这可能需要几分钟时间)。
  • 正在取消…正在取消作业。
  • 已取消:作业已取消。
  • 正在排空…正在排空作业。
  • 已排空:作业已被排空。
  • 正在更新…正在更新作业。
  • 已更新:作业已更新。
  • 成功:作业已成功完成。
  • 失败:作业未能完成。

如需查看一个流水线的详细信息,请点击该作业的名称

访问作业监控图表

如需访问作业的监控图表,请在 Dataflow 监控界面中点击作业名称。此时会显示作业详情页面,其中包含以下信息:

  • 作业图:流水线的直观展示
  • 执行详情:用于优化流水线性能的工具
  • 作业指标:关于作业运行情况的指标
  • 作业信息面板:关于流水线的描述性信息
  • 作业日志:Dataflow 服务在作业级层生成的日志
  • 工作器日志:Dataflow 服务在工作器级层生成的日志
  • 诊断:显示沿所选时间轴发生的错误以及关于流水线的可能建议的表格
  • 时间选择器:可用于调整指标的时间范围的工具

Job 详情页面,您可以使用作业图执行详情作业指标

选择了“作业图”标签页的 Dataflow 监控界面的视图。您可以在此模式下查看流水线图、作业信息、作业日志、工作器日志、诊断和时间选择器工具。

选择了“作业指标”标签页的 Dataflow 监控界面的视图。
您可以在此模式下查看作业指标图表、作业信息、作业日志、工作器日志、诊断和时间选择器工具。

创建 Cloud Monitoring 提醒

Dataflow 与 Cloud Monitoring 完全集成,可让您在作业超出用户定义的阈值时创建提醒。如需从指标图表创建 Cloud Monitoring 提醒,请点击创建提醒政策

**创建提醒政策**链接允许您从指标图表创建提醒。

如需查看创建这些提醒的说明,请参阅使用 Cloud Monitoring for Dataflow 流水线页面。如果您看不到监控图或无法创建提醒,则可能需要其他 Monitoring 权限

全屏模式

要以全屏模式查看指标图表,请点击

使用时间选择器工具

您可以使用时间选择器工具来调整指标的时间范围。您可以选择预定义的持续时间或自定义时间间隔来分析作业。

借助时间选择器工具,您可以选择以小时数和天数为增量的时间范围,也可以选择自定义范围。

对于流处理作业或运行中的批处理作业,默认显示的图表会显示相应作业前六个小时的指标。对于已停止或已完成的流处理作业,默认显示的图表会显示相应作业持续时间内的完整运行时间。

步骤和工作器指标

您可以查看如下指标的图表:

  • 数据新鲜度(仅限流处理流水线)
  • 系统延迟时间(仅限流处理流水线)
  • 自动扩缩
  • 吞吐量
  • CPU 利用率
  • 工作器错误日志计数
  • 输入和输出指标

如需访问这些图表中的其他信息,请点击 Legend Toggle 以切换至“展开图表图例”。

Legend Toggle 按钮位于“创建提醒政策”按钮附近。

数据新鲜度(仅限流处理流水线)

数据新鲜度是指实时与输出水印之间的时间长度。 流水线的每个步骤都有一个输出数据水印。输出数据水印为 T 表示所有事件发生时间早于 T 的元素都经过计算。输出数据水印的上限为所有上游计算操作的最早输入数据水印。如果尚未处理某些输入数据,则输出水印可能会受到影响,进而影响数据新鲜度。

直观呈现数据的图表,显示流处理流水线中的数据新鲜度。

系统延迟时间(仅限流处理流水线)

系统延迟时间是指当前某数据项已处理或等待处理的最长时长(以秒为单位)。此指标表示元素在流水线中任意一个来源内等待的时间。处理完毕后,系统会调整最长时长。以下是其他注意事项:

  • 对于多个来源和接收器,系统延迟时间是指元素在写入到所有接收器之前在来源内等待的最长时长。
  • 有时,来源不会为元素在来源内等待的时间段提供值。此外,该元素可能没有用于定义其事件时间的元数据。 在这种情况下,系统延迟时间从流水线最初收到该元素的时间开始计算。

直观呈现数据的图表,显示了流处理流水线中的系统延迟时间。

自动扩缩

Dataflow 服务会自动选择运行您的自动扩缩作业所需的工作器实例数量。一段时间后,工作器实例数可能会根据作业要求而发生变化。

直观呈现数据的图表,显示了一个流水线中的工作器数量。

要查看自动扩缩更改历史记录,请点击更多历史记录按钮。此时会显示一个表格,其中包含流水线的工作器历史记录信息。

显示流水线的工作器历史记录的表格。

吞吐量

吞吐量是指在任何时间点处理的数据量。该指标针对每个步骤计量,显示为每秒处理的元素数量。如需以“每秒字节数”为单位查看此指标,请点击吞吐量(元素数/秒) > 吞吐量(字节数/秒)

直观呈现数据的图表,显示了流水线中四个步骤的吞吐量。

工作器错误日志计数

工作器错误日志计数显示任何时间点在所有工作器中观察到的错误率。

关于记录的每个错误及其发生次数的摘要。

CPU 占用率

CPU 利用率是用 CPU 的使用量除以可用于处理的 CPU 数量得到的值。该指标针对每个工作器计量,显示为百分比。

直观呈现数据的图表,显示了一个 Dataflow 工作器中的 CPU 利用率。

输入和输出指标

如果您的流处理 Dataflow 作业使用 Pub/Sub 读取或写入记录,则系统会显示输入指标和输出指标。

默认情况下,所有输入指标组合在一起,所有输出指标也会组合在一起。要更改显示的指标,每个部分都提供了过滤条件下拉菜单。 下图显示了所有可用的过滤条件。

可用于 Dataflow 作业的输入指标的过滤条件下拉列表。 可用于 Dataflow 作业的输出指标的过滤条件下拉列表。

输入指标输出指标部分都将显示以下两个图表。

一系列图表,显示 Dataflow 作业的输入和输出指标。

每秒请求数

每秒请求数是指来源或接收器在一段时间内读取或写入数据的 API 请求速率。 如果此速率下降到零,或者相对于预期行为而言,在很长一段时间内大幅降低,则流水线可能无法执行某些操作。此外,也可能没有任何可读取的数据。在这种情况下,请检查具有较大系统水印的作业步骤。此外,您还应该检查工作器日志,看看是否存在错误或表明处理速度缓慢的迹象。

显示来源或接收器在一段时间内读取或写入数据的 API 请求数量的图表。

每秒响应错误数(按错误类型划分)

每秒响应错误数(按错误类型划分)是指来源或接收器在一段时间内读取或写入数据失败的 API 请求速率。如果此类错误频繁出现,则这些 API 请求可能会拖慢处理速度。您必须调查此类失败的 API 请求。为帮助排查这些问题,请查看常规 I/O 错误代码文档以及来源或接收器使用的任何具体错误代码文档,例如 Pub/Sub 错误代码

显示来源或接收器在一段时间内读取或写入数据失败的 API 请求速率的图表。

使用 Metrics Explorer

您可以在 Metrics Explorer 中查看以下 Dataflow I/O 指标:

  • job/pubsub/write_count:来自 Dataflow 作业中 PubsubIO.Write 的 Pub/Sub 发布请求。
  • job/pubsub/read_count:来自 Dataflow 作业中 PubsubIO.Read 的 Pub/Sub 拉取请求。
  • job/bigquery/write_count:来自 Dataflow 作业中 BigQueryIO.Write 的 BigQuery 发布请求。job/bigquery/write_count 指标在使用 WriteToBigQuery 转换 并在 Apache Beam 2.28.0 版或更高版本上启用了 method='STREAMING_INSERTS' 的 Python 流水线中提供。

如需查看 Dataflow 指标的完整列表,请参阅 Google Cloud 指标文档

查看流水线

当您选择特定 Dataflow 作业时,监控界面会显示该作业中流水线的相关信息。此信息包括流水线在 Dataflow 服务上运行时的图形表示、作业摘要、作业日志以及流水线中每个步骤的相关信息。

Dataflow 监控界面可提供流水线的图形表示:即执行图。在流水线的执行图中,每个方框表示流水线中的一个转换。每个方框都包含转换名称和一些有关作业状态的信息,其中包括以下内容:

  • 正在运行:该步骤正在运行。
  • 已加入队列FlexRS 作业中的步骤已加入队列。
  • 成功:该步骤已成功完成。
  • 已停止:由于作业停止,该步骤已停止。
  • 未知:该步骤未能报告状态。
  • 失败:该步骤未能完成。

基本执行图

流水线代码:

Java:SDK 2.x


  // Read the lines of the input text.
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
     // Count the words.
     .apply(new CountWords())
     // Write the formatted word counts to output.
     .apply("WriteCounts", TextIO.write().to(options.getOutput()));

Python


(
    pipeline
    # Read the lines of the input text.
    | 'ReadLines' >> beam.io.ReadFromText(args.input_file)
    # Count the words.
    | CountWords()
    # Write the formatted word counts to output.
    | 'WriteCounts' >> beam.io.WriteToText(args.output_path))

Java:SDK 1.x

执行图:

Dataflow 监控界面中显示的 WordCount 流水线执行图。

图 2:显示的 WordCount 流水线的代码及 Dataflow 监控界面中生成的执行图。

复合转换

在执行图中,您可以展开复合转换(即包含多个嵌套子转换的转换)。 展开式复合转换在执行图中以箭头标示。点击此箭头即可展开相关转换并查看其中所含的子转换。

流水线代码:

Java:SDK 2.x


  // The CountWords Composite Transform
  // inside the WordCount pipeline.

  public static class CountWords
    extends PTransform<PCollection<String>, PCollection<String>> {

    @Override
    public PCollection<String> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

注意:右侧图片中的 FormatCounts 与此 SDK 无关。

Python


# The CountWords Composite Transform inside the WordCount pipeline.
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement()
      # Format each word and count into a printable string.
      | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))

Java:SDK 1.x

执行图:

WordCount 流水线执行图,其中展开了 CountWords 转换以显示其组件转换。

图 3:显示的 CountWords 转换的各子步骤的流水线代码及展开的整个流水线执行图。

转换名称

Dataflow 可通过几种不同的方式来获取监控执行图中显示的转换名称:

Java:SDK 2.x

  • Dataflow 可以使用您在应用转换时分配的名称。您为 apply 方法提供的第一个参数将作为转换名称。
  • Dataflow 可以推断转换名称:根据类名称(如果您已构建自定义转换)或 DoFn 函数对象的名称(如果您使用的是 ParDo 等核心转换)进行推断。

Python

  • Dataflow 可以使用您在应用转换时分配的名称。您可以通过指定转换的 label 参数设置转换名称。
  • Dataflow 可以推断转换名称:根据类名称(如果您已构建自定义转换)或 DoFn 函数对象的名称(如果您使用的是 ParDo 等核心转换)进行推断。

Java:SDK 1.x

了解指标

实际用时

点击某个步骤时,实际用时指标会显示在步骤信息面板中。实际用时提供了在所有工作器的所有线程中完成以下操作所花费的大致总时长:

  • 初始化步骤
  • 处理数据
  • 重排数据
  • 结束步骤

对于复合步骤,实际用时会告知您在各个分步骤中所花费时间的总和。此估算值可帮助您确定执行缓慢的步骤,以及诊断流水线中的哪个部分超出了预期执行时间。

您可以查看某个步骤在流水线中运行所花费的时间。
图 4:实际用时指标可帮助您确保流水线高效运行。

侧边输入指标

侧边输入指标反映了侧边输入访问模式和算法对流水线性能的影响。如果您的流水线使用侧边输入,Dataflow 会将集合写入永久性层(如磁盘),而您的转换操作会从该永久性集合中读取内容。这些读取和写入操作会影响作业的运行时间。

如果您选择的转换会创建或使用侧边输入集合,Dataflow 监控界面便会显示侧边输入指标。您可以在步骤信息面板的侧边输入指标部分中查看相关指标。

创建侧边输入的转换

如果所选转换创建了侧边输入集合,侧边输入指标部分便会显示集合名称以及如下指标:

  • 写入操作用时:写入侧边输入集合所花的时间。
  • 写入的字节数:已写入侧边输入集合的总字节数。
  • 从侧边输入读取用时和字节数:一个包含更多指标的表,这些指标用于使用侧边输入集合的所有转换(称为“侧边输入使用者”)。

从侧边输入读取用时和字节数表包含各侧边输入使用者的以下信息:

  • 侧边输入使用者:侧边输入使用者的转换名称。
  • 读取操作用时:此使用者在读取侧边输入集合上所花的时间。
  • 读取的字节数:此使用者从侧边输入集合中读取的字节数。

如果您的流水线包含一个可创建侧边输入的复合转换,请展开此复合转换,直到您看到创建该侧边输入的特定子转换。然后,选择该子转换以查看侧边输入指标部分。

图 5 显示了创建侧边输入集合的转换的侧边输入指标。

您可以选择子转换,使其侧边输入指标显示在“步骤信息”侧边栏中。
图 5:此执行图包含一个展开式复合转换 (MakeMapView)。用于创建侧边输入的子转换 (CreateDataflowView) 处于选中状态,并且侧边输入指标显示在步骤侧边栏中。

使用一个或多个侧边输入的转换

如果选择的转换使用一个或多个侧边输入,侧边输入指标部分便会显示从侧边输入读取用时和字节数表。此表会显示每个侧边输入集合的以下信息:

  • 侧边输入集合:侧边输入集合的名称。
  • 读取操作用时:转换在读取此侧边输入集合上所花的时间。
  • 读取的字节数:转换从此侧边输入集合中读取的字节数。

如果您的流水线包含一个可读取侧边输入的复合转换,请展开此复合转换,直到您看到读取该侧边输入的特定子转换。然后,选择该子转换以查看侧边输入指标部分。

图 6 显示读取侧边输入集合的转换的侧边输入指标。

您可以选择转换,使其侧边输入指标显示在“步骤信息”侧边栏中。
图 6:JoinBothCollections 转换用于读取侧边输入集合。在此执行图中,JoinBothCollections 处于选中状态,并且侧边输入指标显示在步骤信息侧边栏中。

识别侧边输入性能问题

重复是常见的侧边输入性能问题。如果您的侧边输入 PCollection 过大,工作器将无法在内存中缓存整个集合。因此,工作器必须反复读取永久性侧边输入集合。

在图 7 中,侧边输入指标表明,从侧边输入集合读取的总字节数远远大于集合的实际大小(写入的总字节数)。

您可以选择转换,使其侧边输入指标显示在“步骤信息”侧边栏中。
图 7:有关重复的一个例子。侧边输入集合为 563 MB,而使用转换读取的总字节数几乎达到 12 GB。

如需提升此流水线的性能,请重新设计算法,以避免循环访问或重新获取侧边输入数据。在此示例中,流水线生成两个集合的笛卡尔积。算法遍历整个侧边输入集合以获取主集合的各元素。您可以通过将主集合的多个元素一起进行批处理来改善管道的访问模式。这项更改可减少工作器必须重新读取侧边输入集合的次数。

如果您的流水线通过应用带有一个或多个大型侧边输入的 ParDo 来执行联接操作,则可能会出现另一个常见的性能问题。在这种情况下,工作器会将大部分处理时间用于从侧边输入集合中读取联接操作。

图 8 显示了此问题的侧边输入指标示例:

您可以选择转换,使其侧边输入指标显示在“步骤信息”侧边栏中。
图 8:JoinBothCollections 转换的总处理时间为 18 分 31 秒。工作器将大部分处理时间(10 分 3 秒)用于读取 10 GB 的侧边输入集合。

要提升此流水线的性能,请使用 CoGroupByKey 代替侧边输入。

建议和诊断

诊断

日志下的诊断标签页会收集并显示流水线中生成的某些日志条目。其中包括指示流水线可能存在问题的消息以及包含堆栈轨迹的错误消息。系统会对收集的日志条目进行重复信息删除,然后合并到错误组中。

具有“服务错误”错误组的 Dataflow 作业的“诊断”标签页。

错误报告包含以下信息:

  • 带有错误消息的错误列表。
  • 每个错误发生的次数。
  • 直方图,指示每个错误发生的时间。
  • 最近发生该错误的时间。
  • 错误首次发生的时间。
  • 错误的状态。

如需查看特定错误的错误报告,请点击错误列下的说明。随即将显示错误报告页面。如果错误是服务错误,则会显示包含文档(包括进一步步骤)的附加链接(“问题排查指南”)。

Dataflow 服务错误的错误组详情页面。

如需详细了解该页面,请参阅查看错误

建议

建议标签页显示 Dataflow 提供的与流水线相关的数据分析。这些数据分析的目标是识别有可能在费用和性能方面进行改善的情况。

包含示例建议的 Dataflow 作业的“建议”标签页。

启用建议

您可以通过为流水线设置 --experiments=enable_recommendations 标志来启用建议。

后续步骤

  • 了解如何使用执行详情来优化 Dataflow 作业

  • 探索 Cloud Monitoring 以创建提醒并查看 Dataflow 指标(包括自定义指标)