问题排查和调试

本页面介绍了问题排查技巧和调试策略,如果您在构建或运行 Dataflow 流水线时遇到问题,这些技巧和策略可能会派上用场。本文的内容可以帮助您检测流水线故障,确定流水线运行失败的原因,并推荐了一些可用来纠正问题的操作流程。

Dataflow 会提供与您的作业有关的实时反馈和一组基本步骤,可供您用来检查错误消息、日志和作业进度停滞等情况。

如需获取有关运行 Dataflow 作业时可能遇到的常见错误的指南,请参阅常见错误指南页面。

临时文件和暂存文件指南

以下是 Java 和 Python 流水线的最佳做法。

Java

  • 对于批量作业,我们建议您为临时位置设置存留时间 (TTL)

  • 在设置 TTL 之前,最好确保将暂存位置和临时位置设置为不同的位置

  • 不要删除暂存位置中的对象,因为这些对象将被重复使用。

  • 如果作业完成或停止并且临时对象没有被清理,请从用作临时位置的 Cloud Storage 存储桶中手动移除这些文件。

Python

临时位置和暂存位置的前缀为 <job_name>.<time>

  • 确保您将暂存位置和临时位置设置为不同的位置

  • 如果需要,您可以在作业完成或停止后删除暂存位置中的对象。此外,暂存对象不会在 Python 流水线中重复使用。

  • 如果作业结束并且临时对象没有被清理,请从用作临时位置的 Cloud Storage 存储桶中手动移除这些文件。

  • 对于批处理作业,我们建议您为临时和暂存位置设置存留时间 (TTL)

检查流水线状态

您可以使用 Dataflow 监控界面检测流水线运行中的任何错误。

  1. 转到 Google Cloud Console
  2. 从项目列表中选择您的 Google Cloud 项目。
  3. 点击左上角的菜单。
  4. 转到大数据部分,然后点击 Dataflow。 右侧窗格中将显示正在运行的作业的列表。
  5. 选择要查看的流水线作业。您可以在状态字段中快速查看作业状态:“正在运行”、“成功”或“失败”。
图 1:Developers Console 中的 Dataflow 作业列表,其中包含处于正在运行、成功和失败状态的作业。

基本问题排查工作流程

如果其中一个流水线作业失败,您可以选择作业以查看有关错误和运行结果的详细信息。选择作业后,您可以查看流水线的关键图表、执行图、作业信息面板以及具有作业日志工作器日志诊断推荐标签页的日志面板。

查看作业错误消息

如需查看流水线代码和 Dataflow 服务生成的作业日志,请点击日志面板底部的

您可以通过点击信息 过滤来过滤作业日志中显示的消息。如需仅显示错误消息,请点击信息 ,然后选择错误

如需展开错误消息,请点击可展开部分

突出显示作业日志、诊断、日志级别过滤条件和展开的错误消息的日志面板。

或者,您也可以点击诊断标签页。此标签页会显示沿所选时间轴发生的错误、所有已记录错误的数量以及关于流水线的可能建议。

包含两个已报告的错误的诊断标签页。

查看作业的步骤日志

在流水线图中选择步骤时,日志面板将不再显示 Dataflow 服务生成的作业日志,而是改为显示运行流水线步骤的 Compute Engine 实例中的日志。

突出显示了步骤工作器日志的所选流水线步骤。

Cloud Logging 会将从项目的 Compute Engine 实例中收集的所有日志合并到一个位置。如需详细了解如何使用 Dataflow 的各种日志记录功能,请参阅日志记录流水线消息

处理自动拒绝流水线操作

在某些情况下,Dataflow 服务会识别出您的流水线可能触发已知的 SDK 问题。为了防止提交可能遇到问题的流水线,Dataflow 将自动拒绝您的流水线并显示以下消息:

The workflow was automatically rejected by the service because it may trigger an
identified bug in the SDK (details below). If you think this identification is
in error, and would like to override this automated rejection, please re-submit
this workflow with the following override flag: [OVERRIDE FLAG].
Bug details: [BUG DETAILS].
Contact Google Cloud Support for further help.
Please use this identifier in your communication: [BUG ID].

在阅读链接的错误详情中的警告后,如果您仍然想尝试运行流水线,则可以替换自动拒绝操作。添加 --experiments=<override-flag> 标志,然后重新提交您的流水线。

确定流水线失败的原因

通常,失败的 Apache Beam 流水线运行可归因于以下其中一个原因:

  • 图表或流水线构造错误。如果 Dataflow 按照 Apache Beam 流水线的描述,在构建用于组成流水线的步骤图时遇到问题,就会发生这些错误。
  • 作业验证错误。Dataflow 服务会验证您启动的任何流水线作业。验证过程中发现的错误可能会阻止您成功创建或执行作业。验证错误可能包括 Google Cloud 项目的 Cloud Storage 存储分区问题,也可能包括项目的权限问题。
  • 工作器代码异常。如果用户提供的代码中存在错误,且 Dataflow 将该代码分发给并行工作器(例如 ParDo 转换的 DoFn 实例),就会发生这些错误。
  • 流水线运行缓慢或缺少输出。如果流水线运行缓慢或运行很长一段时间后仍然没有报告结果,您可能需要检查流处理数据源和接收器(如 Pub/Sub)的配额。此外,您可能需要使用某些更适合用于高容量流处理流水线的转换。
  • 其他 Google Cloud 服务中的暂时性故障导致的错误。如果 Dataflow 所依赖的 Google Cloud 服务(例如 Compute Engine 或 Cloud Storage)出现临时中断或其他问题,您的流水线可能会因此而失败。

检测图表或流水线构造错误

当 Dataflow 从您的 Dataflow 程序中的代码构建流水线的执行图表时,可能会发生图表构造错误。在构建图表期间,Dataflow 会检查非法操作。

请注意,如果 Dataflow 在图表构造期间检测到错误,系统将不会在 Dataflow 服务上创建任何作业。因此,您不会在 Dataflow 监控界面中看到任何反馈。 实际上,您会在 Console 或运行 Apache Beam 流水线的终端窗口中看到类似于以下内容的错误消息:

Java:SDK 2.x

例如,如果您的流水线尝试对一个已设定全局窗口、非触发式且无界限的 PCollection 对象执行 GroupByKey 等汇总操作,您将会看到类似于以下内容的错误消息:

...
... Exception in thread "main" java.lang.IllegalStateException:
... GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger.
... Use a Window.into or Window.triggering transform prior to GroupByKey
...

Python

例如,如果您的流水线使用类型提示并且其中一个转换中的参数类型不符合预期,您将看到类似于以下内容的错误消息:

... in <module> run()
... in run | beam.Map('count', lambda (word, ones): (word, sum(ones))))
... in __or__ return self.pipeline.apply(ptransform, self)
... in apply transform.type_check_inputs(pvalueish)
... in type_check_inputs self.type_check_inputs_or_outputs(pvalueish, 'input')
... in type_check_inputs_or_outputs pvalue_.element_type))
google.cloud.dataflow.typehints.decorators.TypeCheckError: Input type hint violation at group: expected Tuple[TypeVariable[K], TypeVariable[V]], got <type 'str'>

Java:SDK 1.x

如果您遇到此类错误,请检查流水线代码,以确保您的流水线操作是正确的。

检测 Dataflow 作业验证中的错误

一旦 Dataflow 服务收到您的流水线图表,就会尝试验证您的作业。此验证包括以下内容:

  • 确保该服务可以访问作业的相关 Cloud Storage 存储分区,以执行文件暂存和临时输出。
  • 在 Google Cloud 项目中检查所需的权限。
  • 确保该服务可以访问输入和输出来源(如文件)。

如果您的作业未通过验证,您将在 Dataflow 监控界面中看到错误消息,而如果您使用了“阻塞执行”功能,也会在您的 Console 或终端窗口中看到错误消息。错误消息类似于以下内容:

Java:SDK 2.x

INFO: To access the Dataflow monitoring console, please navigate to
  https://console.developers.google.com/project/google.com%3Aclouddfe/dataflow/job/2016-03-08_18_59_25-16868399470801620798
Submitted job: 2016-03-08_18_59_25-16868399470801620798
...
... Starting 3 workers...
... Executing operation BigQuery-Read+AnonymousParDo+BigQuery-Write
... Executing BigQuery import job "dataflow_job_16868399470801619475".
... Stopping worker pool...
... Workflow failed. Causes: ...BigQuery-Read+AnonymousParDo+BigQuery-Write failed.
Causes: ... BigQuery getting table "non_existent_table" from dataset "cws_demo" in project "my_project" failed.
Message: Not found: Table x:cws_demo.non_existent_table HTTP Code: 404
... Worker pool stopped.
... com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner run
INFO: Job finished with status FAILED
Exception in thread "main" com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException:
  Job 2016-03-08_18_59_25-16868399470801620798 failed with status FAILED
    at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:155)
    at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:56)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
    at com.google.cloud.dataflow.integration.BigQueryCopyTableExample.main(BigQueryCopyTableExample.java:74)

Python

INFO:root:Created job with id: [2016-03-08_14_12_01-2117248033993412477]
... Checking required Cloud APIs are enabled.
... Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_RUNNING.
... Combiner lifting skipped for step group: GroupByKey not followed by a combiner.
... Expanding GroupByKey operations into optimizable parts.
... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
... Annotating graph with Autotuner information.
... Fusing adjacent ParDo, Read, Write, and Flatten operations
... Fusing consumer split into read
...
... Starting 1 workers...
...
... Executing operation read+split+pair_with_one+group/Reify+group/Write
... Executing failure step failure14
... Workflow failed.
Causes: ... read+split+pair_with_one+group/Reify+group/Write failed.
Causes: ... Unable to view metadata for files: gs://dataflow-samples/shakespeare/missing.txt.
... Cleaning up.
... Tearing down pending resources...
INFO:root:Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_FAILED.

Java:SDK 1.x

检测工作器代码异常

当您的作业运行时,您可能会遇到工作器代码错误或异常。这些错误通常表示流水线代码中的 DoFn 产生了未处理的异常,这种异常会导致 Dataflow 作业中的任务失败。

Dataflow 监控界面会报告用户代码(例如 DoFn 实例)中的异常。如果您在运行流水线时开启了“阻塞执行”功能,您还将看到在 Console 或终端窗口中输出的错误消息,如下所示:

Java:SDK 2.x

INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/example_project/dataflow/job/2017-05-23_14_02_46-1117850763061203461
Submitted job: 2017-05-23_14_02_46-1117850763061203461
...
... To cancel the job using the 'gcloud' tool, run: gcloud beta dataflow jobs --project=example_project cancel 2017-05-23_14_02_46-1117850763061203461
... Autoscaling is enabled for job 2017-05-23_14_02_46-1117850763061203461.
... The number of workers will be between 1 and 15.
... Autoscaling was automatically enabled for job 2017-05-23_14_02_46-1117850763061203461.
...
... Executing operation BigQueryIO.Write/BatchLoads/Create/Read(CreateSource)+BigQueryIO.Write/BatchLoads/GetTempFilePrefix+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/ParDo(UseWindowHashAsKeyAndWindowAsSortKey)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Reify+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Write+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/BatchViewOverrides.GroupByKeyAndSortValuesOnly/Write
... Workers have started successfully.
...
... org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process SEVERE: 2017-05-23T21:06:33.711Z: (c14bab21d699a182): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ArithmeticException: / by zero
        at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:146)
        at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:104)
        at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowAndCombineFn.closeWindow(BatchGroupAlsoByWindowAndCombineFn.java:191)
...
... Cleaning up.
... Stopping worker pool...
... Worker pool stopped.

注意:Dataflow 服务在批处理模式下重试失败任务的次数最多为 4 次,在流处理模式下重试次数则不受限制。在批处理模式下,您的作业将失败;在流处理模式下,您的作业可能会无限期地停滞。

Python

INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
...
INFO:root:... Expanding GroupByKey operations into optimizable parts.
INFO:root:... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
INFO:root:... Annotating graph with Autotuner information.
INFO:root:... Fusing adjacent ParDo, Read, Write, and Flatten operations
...
INFO:root:...: Starting 1 workers...
INFO:root:...: Executing operation group/Create
INFO:root:...: Value "group/Session" materialized.
INFO:root:...: Executing operation read+split+pair_with_one+group/Reify+group/Write
INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
INFO:root:...: ...: Workers have started successfully.
INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
INFO:root:...: Traceback (most recent call last):
  File ".../dataflow_worker/batchworker.py", line 384, in do_work self.current_executor.execute(work_item.map_task)
  ...
  File ".../apache_beam/examples/wordcount.runfiles/py/apache_beam/examples/wordcount.py", line 73, in <lambda>
ValueError: invalid literal for int() with base 10: 'www'

注意:Dataflow 服务在批处理模式下重试失败任务的次数最多为 4 次,在流处理模式下重试次数则不受限制。在批处理模式下,您的作业将失败;在流处理模式下,您的作业可能会无限期地停滞。

Java:SDK 1.x

考虑通过添加异常处理程序来防止代码中的错误。例如,如果您因为某些元素未能通过在 ParDo 中执行的一些自定义输入验证而要舍弃它们,请处理 DoFn 中的异常并删除这些元素。

您还可以通过几种不同的方式跟踪失败的元素:

  • 您可以使用 Cloud Logging 记录失败的元素并检查输出。
  • 您可以按照查看日志中的说明检查 Dataflow 工作器和工作器启动日志中的警告或错误。
  • 您可以让 ParDo 将失败的元素写入其他输出中,以供后续检查使用。

如需跟踪正在执行的流水线的属性,您可以使用 Metrics 类,如以下示例所示:

Java:SDK 2.x

final Counter counter = Metrics.counter("stats", "even-items");
PCollection<Integer> input = pipeline.apply(...);
...
input.apply(ParDo.of(new DoFn<Integer, Integer>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    if (c.element() % 2 == 0) {
      counter.inc();
    }
});

Python

class FilterTextFn(beam.DoFn):
      """A DoFn that filters for a specific key based on a regex."""

      def __init__(self, pattern):
        self.pattern = pattern
        # A custom metric can track values in your pipeline as it runs. Create
        # custom metrics to count unmatched words, and know the distribution of
        # word lengths in the input PCollection.
        self.word_len_dist = Metrics.distribution(self.__class__,
                                                  'word_len_dist')
        self.unmatched_words = Metrics.counter(self.__class__,
                                               'unmatched_words')

      def process(self, element):
        word = element
        self.word_len_dist.update(len(word))
        if re.match(self.pattern, word):
          yield element
        else:
          self.unmatched_words.inc()

    filtered_words = (
        words | 'FilterText' >> beam.ParDo(FilterTextFn('s.*')))

Java:SDK 1.x

对流水线运行缓慢或缺少输出等进行问题排查

Java:SDK 2.x

如果您的高容量流处理流水线运行缓慢或者停滞,可以检查以下几个方面:

Pub/Sub 配额

如果您的流水线从 Pub/Sub 读取输入,则您的 Google Cloud 项目可能没有足够的 Pub/Sub 配额。配额不足的一个指标是,您的作业会产生大量 429 (Rate Limit Exceeded) 错误。请尝试按照以下步骤检查此类错误:

  1. 转到 Google Cloud Console
  2. 在左侧导航窗格中,点击 API 和服务
  3. 搜索框中,搜索 Pub/Sub
  4. 点击用量标签页。
  5. 检查响应代码并查找 (4xx) 客户端错误代码。

在 Combine 转换中使用 .withFanout

如果您的流水线需要处理大容量无界限 PCollection,我们建议您采取以下做法:

  • 使用 Combine.Globally.withFanout,而不要使用 Combine.Globally
  • 使用 Combine.PerKey.withHotKeyFanout,而不要使用 Count.PerKey

Python

如果您的高容量流处理流水线运行缓慢或者停滞,可以检查以下几个方面:

Pub/Sub 配额

如果您的流水线从 Pub/Sub 读取输入,则您的 Google Cloud 项目可能没有足够的 Pub/Sub 配额。配额不足的一个指标是,您的作业会产生大量 429 (Rate Limit Exceeded) 错误。请尝试按照以下步骤检查此类错误:

  1. 转到 Google Cloud Console
  2. 在左侧导航窗格中,点击 API 和服务
  3. 搜索框中,搜索 Pub/Sub
  4. 点击用量标签页。
  5. 检查响应代码并查找 (4xx) 客户端错误代码。

Java:SDK 1.x

使用执行详细信息

如果作业运行缓慢或卡住,请使用执行详细信息标签页。

此功能允许您检查批处理作业的执行情况。您可以借助它来确定导致瓶颈的阶段或工作器。如需了解详情,请参阅执行详细信息

识别缓慢或卡住的阶段

如需识别缓慢或卡住的阶段,请使用阶段进度视图。较长的柱形表示阶段需要较多时间,因此此视图可让您快速识别流水线中最慢的阶段。

找到瓶颈阶段后,您可以执行以下操作:

  • 识别该阶段中的滞后工作器
  • 如果没有滞后工作器,请确定哪个步骤对阶段运行时的影响最大。如需确定最慢的步骤,请使用侧边信息面板。然后,您可以确定用户代码优化的候选对象。

识别滞后的工作器

如需确定特定阶段的滞后工作器,请使用工作器进度视图。

您可以查看所有工作器是否会一直处理工作,直到阶段结束或者单个工作器卡在延迟任务上。找到此工作器后,您可以执行以下操作:

常见错误和操作流程

如需获取有关运行 Dataflow 作业时可能遇到的常见错误的指南,请参阅常见错误指南页面。