WordCount 示例管道

继续学习之前,请先执行快速入门中的步骤(如果您尚未这样做)。

WordCount 示例演示了如何设置处理管道来读取文本、将文本行标记化为单个词以及对其中的每个词执行词频计数。Dataflow SDK 包含以下四个 WordCount 示例(随后将逐一详细说明),这一系列示例是基于彼此构建而成的。所有示例的输入文本均采用莎士比亚的作品集。

Dataflow SDK 中各 WordCount 示例引入的概念各不相同。

  • Minimal WordCount 演示了构建 Dataflow 管道所涉及的基本准则。
  • WordCount 引入了创建可重用且可维护管道方面的一些较为常见的最佳做法。
  • Debugging WordCount 引入了日志记录和调试做法。
  • Windowed WordCount 演示了如何使用 Dataflow 的编程模型来处理有界限和无界限的数据集。

首先了解 Minimal WordCount,这是其中最简单的一个示例。如果您认为自己可以自如地运用构建流水线的基本准则,请继续学习 WordCount 中编写 Dataflow 程序的最佳做法。然后,请仔细阅读 Debugging WordCount,了解如何运用常见做法进行日志记录和调试。最后,学习 Windowed WordCount 示例,了解如何使用同一计算模式同时处理有界限数据集和无界限数据集。

MinimalWordCount

Minimal WordCount 演示了一个简单管道,该管道可以从 Google Cloud Storage 文件中读取文本块,应用转换来对各个词进行标记化和数量统计,然后将数据写入 Cloud Storage 存储分区的一个输出文件。此示例对其输入和输出文件的位置进行了硬编码,不会执行任何错误检查;它只会显示创建 Dataflow 管道的“大概过程”。在后面的示例中,我们会将管道的输入和输出源参数化,并呈现其他最佳做法。

Java

主要概念
  1. 创建流水线
  2. 对管道应用转换
    • 读取输入(在此示例中:读取文本文件)
    • 应用 ParDo 转换
    • 应用 SDK 提供的转换(在此示例中为 Count
    • 写入输出(在此示例中:将输出写入 Google Cloud Storage 中)
  3. 运行管道

以下部分详细介绍了这些概念,并提供了摘自 Minimal WordCount 管道的相关代码片段。

创建管道

要创建 Cloud Dataflow 管道,首先需要创建“管道选项”对象。通过此对象,我们可以设置管道的各种选项,例如,用于执行管道的管道运行程序、项目 ID 以及管道用来存储其文件的暂存位置(让您的 jar 可在云端访问)。在此示例中,我们以编程方式设置这些选项,但较为通常的做法是使用命令行参数设置流水线选项

在我们的示例中,为了使用 Google Cloud Dataflow 服务在云端执行流水线,我们将 BlockingDataflowPipelineRunner 指定为 PipelineRunner。您可以设置其他选项以在云端执行您的流水线。您也可以完全省略此选项,此时,默认运行程序将在本地执行您的管道。这些内容将在接下来的两个 WordCount 示例中进行演示;如需了解详情,请参阅指定执行参数

Java

DataflowPipelineOptions options = PipelineOptionsFactory.create()
    .as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("SET-YOUR-PROJECT-ID-HERE");
// The 'gs' URI means that this is a Google Cloud Storage path
options.setStagingLocation("gs://SET-YOUR-BUCKET-NAME-HERE");

接下来,使用我们刚刚构建的选项创建一个 Pipeline 对象。Pipeline 对象构建与该特定流水线关联的要执行的转换图。

Java

Pipeline p = Pipeline.create(options);

如需详细了解此管道对象及其工作原理,请参阅管道

应用管道转换

Minimal WordCount 流水线包含多个转换,用于将数据读入流水线、处理数据或以其他方式转换数据,并写出结果。每个转换表示流水线中的一项操作。

各转换接受某种类型的输入(数据或其他内容),并生成一些输出数据。输入和输出数据由 SDK 类 PCollection 表示。PCollection 是 Dataflow SDK 提供的一个特殊类,您可以使用它来表示几乎任何大小的数据集,包括无限数据集。

图 1 显示流水线的数据流:

此流水线使用 TextIO.Read 转换基于存储在输入数据文件中的数据创建 PCollection;CountWords 转换根据原始文本 PCollection 生成由字数统计组成的 PCollection;TextIO.Write 将设置了格式的字数统计写入输出数据文件。
图 1:流水线数据流。

Minimal WordCount 管道包含五个转换:

  1. 文本文件 Read 转换应用于 Pipeline 对象本身,并生成 PCollection 作为输出。输出 PCollection 中的每个元素表示输入文件中的一行文本。
  2. Java

    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/kinglear.txt"))
    
  3. ParDo 转换,它会对每个元素调用 DoFn(以内嵌方式定义为匿名类),以便将文本行标记化为一个个字词。此转换的输入是由前一 TextIO.Read 转换所生成文本行组成的 PCollectionParDo 转换会输出新的 PCollection,其中的每个元素表示文本中的一个词。
  4. Java

    正如我们示例中所演示的那样,您可以使用 .named() 操作为转换指定一个转换名称,该转换名称将显示在 Dataflow 监控界面中。当 Dataflow 服务执行您的流水线时,监控界面会在每个 ParDo 转换执行时显示提示。

      .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
         @Override
         public void processElement(ProcessContext c) {
           for (String word : c.element().split("[^a-zA-Z']+")) {
             if (!word.isEmpty()) {
               c.output(word);
             }
           }
         }
      }))
    
  5. SDK 提供的 Count 转换是一种通用转换,它接受任意类型的 PCollection 并返回由键值对组成的 PCollection。每个键表示输入集合中的唯一元素,而每个值表示该键在输入集合中出现的次数。

    在此流水线中,Count 的输入是由前一 ParDo 所生成各个词组成的 PCollection,输出是由键值对组成的 PCollection,其中的每个键表示文本中的唯一词,而关联值表示每个词的出现次数。
  6. Java

      .apply(Count.<String>perElement())
    
  7. 下一个转换可将每个表示唯一词及其出现次数的键值对设置为适合写入输出文件的可打印字符串格式。
  8. Java

    MapElements 是一种较高级别的复合转换,其中封装了一个简单的 ParDo 转换;对于输入 PCollection 中的每个元素,MapElements 会应用仅生成一个输出元素的函数。MapElements 会调用 SimpleFunction(以内嵌方式定义为匿名类)来执行这种格式设置。MapElements 接受由 Count 所生成键值对组成的 PCollection 作为输入,并生成由可打印字符串组成的新 PCollection

      .apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
         @Override
         public String apply(KV<String, Long> element) {
           return element.getKey() + ": " + element.getValue();
         }
      }))
    
  9. 文本文件 Write 转换。此转换接受由已设置格式的 String 组成的最终 PCollection 作为输入,并将各元素写入输出文本文件中。输入 PCollection 中的每个元素表示生成的输出文件中的一行文本。
  10. Java

      .apply(TextIO.Write.to("gs://my-bucket/counts.txt"));
    

    请注意,Write 转换生成的 PDone 类型结果值极小(本例中已忽略该值)。

运行管道

要运行流水线,请调用 run 方法来发送流水线,供您在创建流水线时指定的流水线运行程序执行。

Java

p.run();

WordCount 示例

WordCount 示例引入了几种推荐使用的编程做法,它们可让您的管道更易于读取、写入和维护。我们并未明确要求您采用这些做法,但这样做可让您的管道更具灵活性,并有助于测试管道和创建可重用的管道代码。

本部分假定您已经非常了解构建管道的基本概念。如果您认为自己还没有达到这种程度,请阅读上面的 Minimal WordCount 部分。

Java

新概念
  1. 使用显式 DoFn 应用 ParDo
  2. 创建复合转换
  3. 使用可参数化的 PipelineOptions

以下部分详细介绍了这些主要概念,并将流水线代码细分为若干较小的部分。

指定显式 DoFn

使用 ParDo 转换时,您需要指定应用于输入 PCollection 中各元素的处理操作。此处理操作是 SDK 类 DoFn 的一个子类。前一部分 (Minimal WordCount) 中的示例流水线以内嵌方式为每个 ParDo 创建了 DoFn 子类,以作为匿名内部类实例。

但通常情况下,最好在全局级别定义 DoFn,这样更便于单元测试并可提高 ParDo 代码的可读性。

正如前一示例 (Minimal WordCount) 中所述,当您执行流水线时,Dataflow 监控界面会在各 ParDo 转换执行时显示提示。Dataflow 服务会自动根据您传递的 DoFn 名称为 ParDo 转换生成转换名称。例如,应用 FormatAsTextFn()ParDo 在此监控界面中显示为 ParDo(FormatAsText)

Java

在此示例中,DoFn 被定义为静态类:

/** A DoFn that converts a Word and Count into a printable string. */
static class FormatAsTextFn extends DoFn<KV<String, Long>, String> {
  ...

  @Override
  public void processElement(ProcessContext c) {
    ...
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  // Our pipeline passes an instance of static FormatAsTextFn() to the ParDo transform.
  p.apply(...)
   .apply(...)
   .apply(ParDo.of(new FormatAsTextFn()))
   ...
}

如需详细了解如何为 ParDo 转换创建和指定 DoFn 子类,请参阅使用 ParDo 并行处理

创建复合转换

如果您的处理操作包含多个转换或 ParDo 步骤,则可以将其创建为 PTransform 的子类。通过创建 PTransform 子类,您可以创建复杂的可重用转换,使流水线结构更清晰且更模块化,而且让单元测试更加简单方便。

通过使用 PTransform 子类明确指定流水线的逻辑结构,也可使监控流水线变得更简单轻松。当 Dataflow 服务构建流水线的最终优化结构时,Dataflow 监控界面将使用您构建的转换来更准确地反映流水线的结构。

Java

在此示例中,我们将两个转换封装为 PTransform 子类 CountWordsCountWords 包含 ParDo,用于运行 ExtractWordsFn 和 SDK 提供的 Count 转换。

定义 CountWords 时,我们指定其最终输入和输出;输入为用于提取操作的 PCollection<String>,输出为由计数操作生成的 PCollection<KV<String, Long>>

public static class CountWords extends PTransform<PCollection<String>,
    PCollection<KV<String, Long>>> {
  @Override
  public PCollection<KV<String, Long>> apply(PCollection<String> lines) {

    // Convert lines of text into individual words.
    PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

    // Count the number of times each word occurs.
    PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

    return wordCounts;
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  p.apply(...)
   .apply(new CountWords())
   ...
}

使用可参数化的 PipelineOptions

在前一示例 (Minimal WordCount) 中,我们在创建流水线时设置了各种执行选项。在此示例中,我们通过扩展 PipelineOptions 定义我们自己的自定义配置选项。

您可以添加自己的参数以供命令行解析器处理,并为这些参数指定默认值。您随后可以通过管道代码访问这些选项值。

在 Minimal WordCount 示例中,我们对流水线选项进行了硬编码。但最常见方法是通过命令行参数解析构建 PipelineOptions

Java

public static interface WordCountOptions extends PipelineOptions {
  @Description("Path of the file to read from")
  @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
  String getInputFile();
  void setInputFile(String value);
  ...
}

public static void main(String[] args) {
  WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      .as(WordCountOptions.class);
  Pipeline p = Pipeline.create(options);
  ...
}

Debugging WordCount 示例

Debugging WordCount 示例演示了有关检测管道代码的一些最佳做法。您可以使用 Dataflow 监控界面Aggregator 更详细地了解流水线的运行情况。

Java

您还可以使用 SDK 的 DataflowAssert 测试转换在各不同管道阶段的输出。

Java

新概念
  1. 在 Dataflow 监控界面中查看日志
  2. 控制 Dataflow 工作器日志级别
  3. 创建 Aggregators
  4. 通过 DataflowAssert 测试您的流水线

以下部分详细介绍了这些主要概念,并将流水线代码细分为若干较小的部分。

在 Dataflow 监控界面中查看日志

Google Cloud Logging 会将来自 Dataflow 作业的所有工作器的日志汇总到 Google Cloud Console 中的一个位置。您可以使用 Dataflow 监控界面查看所有 Compute Engine 实例的日志,Dataflow 启用这些实例来完成 Dataflow 作业。您可以将日志语句添加到流水线的 DoFn 实例中,当流水线运行时,这些语句会显示在监控界面中。

Java

以下 SLF4J 日志记录器使用 FilterTextFn 的完全限定类名称作为日志记录器名称。由此日志记录器发出的所有日志语句都将通过这一名称进行引用,并将以相应的日志级别设置显示在 Dataflow 监控界面中。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DebuggingWordCount {
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...
    private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class);
    ...

    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        // Log at the "DEBUG" level each element that we match. When executing this pipeline
        // using the Dataflow service, these log lines will appear in the Dataflow Monitoring UI
        // only if the log level is set to "DEBUG" or lower.
        LOG.debug("Matched: " + c.element().getKey());
        ...
      } else {
        // Log at the "TRACE" level each element that is not matched. Different log levels
        // can be used to control the verbosity of logging providing an effective mechanism
        // to filter less important information.
        LOG.trace("Did not match: " + c.element().getKey());
        ...
      }
    }
  }
}

控制 Dataflow 工作器日志级别

Java

默认情况下,执行用户代码的 Dataflow 工作器被配置为以 INFO 及更高日志级别将日志记录到 Cloud Logging。您可以通过指定以下内容来覆盖特定日志记录命名空间的日志级别:

--workerLogLevelOverrides={"Name1"="Level1","Name2"="Level2",...}

例如,指定以下内容后:

--workerLogLevelOverrides={"com.example":"DEBUG"}

当您使用 Dataflow 服务执行此流水线时,除了默认 INFO 或更高级别的日志以外,监控界面还将包含(且仅包含)com.example 软件包的 DEBUG 或更高级别日志。

此外,您可以通过指定以下内容来覆盖默认 Dataflow 工作器日志记录配置

--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>

例如,指定以下内容后:

--defaultWorkerLogLevel=DEBUG

使用 Dataflow 服务执行此流水线时,监控界面将包含所有 DEBUG 或更高级别的日志。请注意,如果将默认的工作器日志级别更改为 TRACEDEBUG,日志记录信息量将显著增加。

如需了解详情,请参阅 Cloud Dataflow 日志记录

创建 Aggregator

自定义 Aggregator 可以在管道运行时跟踪管道中的各种值。使用 Dataflow 服务运行流水线时,这些值将显示在 Dataflow 监控界面中。

在系统开始执行创建 Aggregator 的 ParDo 转换和/或 Aggregator 的初始值发生更改之前,Aggregator 可能不会显示。在这种情况下,您可以在监控界面的作业摘要底部查看它们。

以下自定义 Aggregator 跟踪匹配词和不匹配词的数量。

Java

public class DebuggingWordCount {
  /** A DoFn that filters for a specific key based upon a regular expression. */
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...

    private final Aggregator<Long, Long> matchedWords =
        createAggregator("matchedWords", new Sum.SumLongFn());
    private final Aggregator<Long, Long> unmatchedWords =
      createAggregator("umatchedWords", new Sum.SumLongFn());

    @Override
    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        ...
        matchedWords.addValue(1L);
        ...
      } else {
        ...
        unmatchedWords.addValue(1L);
      }
    }
  }
}

批处理管道和流处理管道中的 Aggregator

在批处理流水线中,Aggregator 可确保一致性。对于成功的内容包,Aggregator 只提交一次;而对于失败的内容包,Aggregator 则不会执行提交。

在流处理管道中,Aggregator 可提供较为宽松的语义。对于成功的内容包,贡献就是尽力而为;而对于失败的内容包,贡献则可能体现在最终值上。

通过 DataflowAssert 测试您的流水线

Java

DataflowAssert 是一组采用 Hamcrest 集合匹配器样式的便捷 PTransforms,可用于编写流水线级别测试以验证 PCollections 的内容。DataflowAssert 最适用于采用小型数据集的单元测试,但在此处,我们将它作为一种教学工具来展示。

下面,我们验证过滤后的字词集是否与我们预期的计数相匹配。请注意,DataflowAssert 不会提供任何输出,流水线成功完成就意味着符合预期。详细了解如何测试流水线,并参阅 DebuggingWordCountTest 以获取示例单元测试。

public static void main(String[] args) {
  ...
  List<KV<String, Long>> expectedResults = Arrays.asList(
        KV.of("Flourish", 3L),
        KV.of("stomach", 1L));
  DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults);
  ...
}

WindowedWordCount

Java

与前面的示例一样,本示例 (WindowedWordCount) 也统计文本中的字数,但它引入了多个高级概念。WindowedWordCount 的输入可以是固定数据集(如同前面的示例),也可以是无界限数据流。

通过 Dataflow SDK,您可以非常便捷地创建一个可以同时处理有界限类型输入和无界限类型输入的流水线。如果输入是无界限的,则流水线的所有 PCollections 也将是无界限的。有界限输入也是如此。

在阅读本部分内容之前,请确保您已经熟悉构建管道的基本准则并可自如地运用这些准则。

新概念:
  1. 读取无界限输入和有界限输入
  2. 向数据添加时间戳
  3. 窗口化
  4. 写入无界限输出和有界限输出

以下部分详细介绍了这些主要概念,并将管道代码细分为若干较小的部分。

读取无界限输入和有界限输入

WindowedWordCount 的输入可以是有界限输入,也可以是无界限输入。如果您的输入具有固定数量的元素,该输入则被视为“有界限”数据集。如果您的输入会不断更新,该输入则被视为“无界限”数据集。请参阅有界限 PCollection 和无界限 PCollection,详细了解输入类型。

在此示例中,您可以选择是使用有界限输入还是无界限输入。您应该还记得,所有示例的输入都是莎士比亚的文本集,即有限输入。但为了说明此示例中的新概念,我们在其输入中重放莎士比亚的文本。

在此示例中,如果输入是无界限的,系统将从 Google Cloud Pub/Sub 主题中读取输入;在这种情况下,对流水线应用的 Read 转换是 PubSubIO.Read。否则,系统将从 Google Cloud Storage 中读取输入。

public static void main(String[] args) throws IOException {
    ...
    PCollection<String> input;
    if (options.isUnbounded()) {
      LOG.info("Reading from PubSub.");
      // Read from the Pub/Sub topic. A topic will be created if it wasn't specified as an arg.
      input = pipeline.apply(PubsubIO.Read.topic(options.getPubsubTopic()));

  } else {
      // Else, this is a bounded pipeline. Read from the Google Cloud Storage file.
      input = pipeline.apply(TextIO.Read.from(options.getInputFile()))
      ...
    }
    ...
}

向数据添加时间戳

PCollection 中的每个元素都有一个关联的时间戳。各元素的时间戳由创建 PCollection 的来源分配。在此示例中,如果您为流水线选择了无界限输入,则时间戳将来自 Pub/Sub 数据源。如果您选择了有界限输入,则名为 AddTimestampsFnDoFn 方法(由 ParDo 调用)将为 PCollection 中的每个元素设置一个时间戳。

public static void main(String[] args) throws IOException {
  ...
  input = pipeline
    .apply(...)
    // Add an element timestamp, using an artificial time.
    .apply(ParDo.of(new AddTimestampFn()));
}

下面是 AddTimestampsFn(由 ParDo 调用的一个 DoFn)的代码,它可为给定元素本身设置时间戳的数据元素。例如,如果元素为日志行,则此 ParDo 可以从日志字符串中解析出时间并将其设置为元素的时间戳。莎士比亚作品并没有固有的时间戳,因此我们在本例中编写了随机时间戳,目的只是为了说明此概念。每行输入文本将会随机获得一个关联时间戳,该时间戳对应一个 2 小时期间内的某一时间。

static class AddTimestampFn extends DoFn<String, String> {
  private static final Duration RAND_RANGE = Duration.standardHours(2);
  private final Instant minTimestamp;

  AddTimestampFn() {
    this.minTimestamp = new Instant(System.currentTimeMillis());
  }

  @Override
  public void processElement(ProcessContext c) {
    // Generate a timestamp that falls somewhere in the past 2 hours.
    long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
        Instant randomTimestamp = minTimestamp.plus(randMillis);
    // Set the data element with that timestamp.
    c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
  }
}

如需详细了解时间戳,请参阅 PCollection 元素时间戳

窗口化

Dataflow SDK 使用数据选取的概念,根据各元素的时间戳细分 PCollection。用于汇总多个元素的 Dataflow 转换会将各个 PCollection 作为一系列有限窗口来处理,即使整个集合本身的大小可能并无界限也是如此。

WindowingWordCount 示例应用固定时间窗口,其中每个窗口表示一个固定时间间隔。此示例的固定窗口大小默认为 1 分钟(您可以使用命令行选项更改此值)。此流水线随后应用 CountWords 转换。

PCollection<KV<String, Long>> wordCounts = input
  .apply(Window.<String>into(
    FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
   .apply(new WordCount.CountWords());

写入无界限输出和有界限输出

由于我们的输入可能是有界限或无界限的,因此我们的输出 PCollection 也会是如此。我们需要确保选择合适的接收器。一些输出接收器仅支持有界限输出或仅支持无界限输出。例如,文本文件是只能接收有界限数据的接收器。BigQuery 输出源既支持有界限输入,也支持无界限输入。

在此示例中,我们将结果流式传输到 BigQuery 表格。随后针对 BigQuery 表格设置结果的格式,并使用 BigQueryIO.Write 将其写入 BigQuery。

wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
  .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));