Dataflow 作业图

当您选择特定 Dataflow 作业时,监控界面会提供作业的图形表示,即作业图。 控制台中的作业图页面还提供了作业摘要、作业日志以及流水线中每个步骤的相关信息。

在流水线的作业图中,每个方框表示流水线中的一个转换。每个方框都包含转换名称和一些有关作业状态的信息,其中包括以下内容:

  • 正在运行:该步骤正在运行
  • 已加入队列FlexRS 作业中的步骤已加入队列
  • 成功:该步骤已成功完成
  • 已停止:由于作业停止,该步骤已停止
  • 未知:该步骤未能报告状态
  • 失败:该步骤未能完成

默认情况下,作业图页面会显示图表视图。如需以表形式查看作业图,请在作业步骤视图中选择表视图。表视图以不同的格式包含相同的信息。表视图在以下场景中非常有用:

  • 您的作业有许多阶段,使作业图难以浏览。
  • 您想按特定属性对作业步骤进行排序。例如,您可以按实际用时对表进行排序,以识别速度缓慢的步骤。

基本作业图

流水线代码:

Java

  // Read the lines of the input text.
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
     // Count the words.
     .apply(new CountWords())
     // Write the formatted word counts to output.
     .apply("WriteCounts", TextIO.write().to(options.getOutput()));

Python

(
    pipeline
    # Read the lines of the input text.
    | 'ReadLines' >> beam.io.ReadFromText(args.input_file)
    # Count the words.
    | CountWords()
    # Write the formatted word counts to output.
    | 'WriteCounts' >> beam.io.WriteToText(args.output_path))

Go

  // Create the pipeline.
  p := beam.NewPipeline()
    s := p.Root()
  // Read the lines of the input text.
  lines := textio.Read(s, *input)
  // Count the words.
  counted := beam.ParDo(s, CountWords, lines)
  // Write the formatted word counts to output.
  textio.Write(s, *output, formatted)
作业图

Dataflow 监控界面中显示的 WordCount 流水线执行图。

图 1:显示的 WordCount 流水线的代码及 Dataflow 监控界面中生成的执行图。

复合转换

在作业图中,您可以展开复合转换(即包含多个嵌套子转换的转换)。 展开式复合转换在执行图中以箭头标示。如需展开转换并查看子转换,请点击箭头。

流水线代码:

Java

  // The CountWords Composite Transform
  // inside the WordCount pipeline.

  public static class CountWords
    extends PTransform<PCollection<String>, PCollection<String>> {

    @Override
    public PCollection<String> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

Python

# The CountWords Composite Transform inside the WordCount pipeline.
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement()
      # Format each word and count into a printable string.
      | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))

Go

  // The CountWords Composite Transform inside the WordCount pipeline.
  func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
    s = s.Scope("CountWords")

    // Convert lines of text into individual words.
    col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)

    // Count the number of times each word occurs.
    return stats.Count(s, col)
  }
作业图

WordCount 流水线作业图,其中展开了 CountWords 转换以显示其组件转换。

图 2:CountWords 转换的各子步骤的流水线代码。显示了展开的整个流水线作业图。

在流水线代码中,您可以使用以下代码调用复合转换:

result = transform.apply(input);

以这种方式调用的复合转换会忽略预期的嵌套,因此可能会在 Dataflow 监控界面中显示为展开状态。您的流水线在执行时,也可能会产生与固定的唯一名称有关的警告或错误。

为避免这些问题,请使用建议的格式调用转换:

result = input.apply(transform);

转换名称

Dataflow 可通过几种不同的方式来获取监控作业图中显示的转换名称。转换名称用于公开可见的位置,包括 Dataflow 监控界面、日志文件和调试工具。请勿使用包含个人身份信息(例如用户名或组织名称)的转换名称。

Java

  • Dataflow 可以使用您在应用转换时分配的名称。您为 apply 方法提供的第一个参数将作为转换名称。
  • Dataflow 可以推断转换名称:根据类名称(如果您构建自定义转换)或 DoFn 函数对象的名称(如果您使用的是 ParDo 等核心转换)进行推断。

Python

  • Dataflow 可以使用您在应用转换时分配的名称。您可以通过指定转换的 label 参数设置转换名称。
  • Dataflow 可以推断转换名称:根据类名称(如果您构建自定义转换)或 DoFn 函数对象的名称(如果您使用的是 ParDo 等核心转换)进行推断。

Go

  • Dataflow 可以使用您在应用转换时分配的名称。您可以通过指定 Scope 来设置转换名称。
  • Dataflow 可以推断转换名称:根据结构体名称(如果您使用结构体 DoFn)或函数名称(如果您使用函数 DoFn)进行推断。

了解指标

本部分详细介绍了与作业图关联的指标。

实际用时

点击某个步骤时,实际用时指标会显示在步骤信息面板中。实际用时提供了在所有工作器的所有线程中完成以下操作所花费的大致总时长:

  • 初始化步骤
  • 处理数据
  • 重排数据
  • 结束步骤

对于复合步骤,实际用时会告知您在各个分步骤中所花费时间的总和。此估算值可帮助您确定执行缓慢的步骤,以及诊断流水线中的哪个部分超出了预期执行时间。

您可以查看某个步骤在流水线中运行所花费的时间。
图 3:实际用时指标可帮助您确保流水线高效运行。

侧边输入指标

侧边输入指标反映了侧边输入访问模式和算法对流水线性能的影响。如果您的流水线使用侧边输入,Dataflow 会将集合写入永久性层(如磁盘),而您的转换操作会从该永久性集合中读取内容。这些读取和写入操作会影响作业的运行时间。

如果您选择的转换会创建或使用侧边输入集合,Dataflow 监控界面便会显示侧边输入指标。您可以在步骤信息面板的侧边输入指标部分中查看相关指标。

创建侧边输入的转换

如果所选转换创建了侧边输入集合,侧边输入指标部分便会显示集合名称以及如下指标:

  • 写入操作用时:写入侧边输入集合所花的时间。
  • 写入的字节数:已写入侧边输入集合的总字节数。
  • 从侧边输入读取用时和字节数:一个包含更多指标的表,这些指标用于使用侧边输入集合的所有转换(称为“侧边输入使用者”)。

从侧边输入读取用时和字节数表包含各侧边输入使用者的以下信息:

  • 侧边输入使用者:侧边输入使用者的转换名称。
  • 读取操作用时:此使用者在读取侧边输入集合上所花的时间。
  • 读取的字节数:此使用者从侧边输入集合中读取的字节数。

如果您的流水线包含一个可创建侧边输入的复合转换,请展开此复合转换,直到您看到创建该侧边输入的特定子转换。然后,选择该子转换以查看侧边输入指标部分。

图 4 显示了创建侧边输入集合的转换的侧边输入指标。

您可以选择子转换,使其侧边输入指标显示在“步骤信息”侧边栏中。
图 4::作业图包含一个展开的复合转换 (MakeMapView)。用于创建辅助输入的子转换 (CreateDataflowView) 处于选中状态,并且辅助输入指标显示在步骤信息侧边栏中。

使用一个或多个侧边输入的转换

如果选择的转换使用一个或多个侧边输入,侧边输入指标部分便会显示从侧边输入读取用时和字节数表。此表会显示每个侧边输入集合的以下信息:

  • 侧边输入集合:侧边输入集合的名称。
  • 读取操作用时:转换在读取此侧边输入集合上所花的时间。
  • 读取的字节数:转换从此侧边输入集合中读取的字节数。

如果您的流水线包含一个可读取侧边输入的复合转换,请展开此复合转换,直到您看到读取该侧边输入的特定子转换。然后,选择该子转换以查看侧边输入指标部分。

图 5 显示读取侧边输入集合的转换的侧边输入指标。

您可以选择转换,使其侧边输入指标显示在“步骤信息”侧边栏中。
图 5:JoinBothCollections 转换用于读取侧边输入集合。在此作业图中,JoinBothCollections 处于选中状态,并且侧边输入指标显示在步骤信息侧边栏中。

识别侧边输入性能问题

重复是常见的侧边输入性能问题。如果您的侧边输入 PCollection 过大,工作器将无法在内存中缓存整个集合。因此,工作器必须反复读取永久性侧边输入集合。

在图 6 中,侧边输入指标表明,从侧边输入集合读取的总字节数远远大于集合的实际大小(写入的总字节数)。

您可以选择转换,使其侧边输入指标显示在“步骤信息”侧边栏中。
图 6:有关重复的一个例子。侧边输入集合为 563 MB,而使用转换读取的总字节数几乎达到 12 GB。

如需提升此流水线的性能,请重新设计算法,以避免循环访问或重新获取侧边输入数据。在此示例中,流水线生成两个集合的笛卡尔积。算法遍历整个侧边输入集合以获取主集合的各元素。您可以通过将主集合的多个元素一起进行批处理来改善管道的访问模式。这项更改可减少工作器必须重新读取侧边输入集合的次数。

如果您的流水线通过应用带有一个或多个大型侧边输入的 ParDo 来执行联接操作,则可能会出现另一个常见的性能问题。在这种情况下,工作器会将大部分处理时间用于从侧边输入集合中读取联接操作。

图 7 显示了此问题的侧边输入指标示例:

您可以选择转换,使其侧边输入指标显示在“步骤信息”侧边栏中。
图 7:JoinBothCollections 转换的总处理时间为 18 分 31 秒。工作器将大部分处理时间(10 分 3 秒)用于读取 10 GB 的侧边输入集合。

如需提升此流水线的性能,请使用 CoGroupByKey 代替侧边输入。