Troubleshooting and debugging

This page provides troubleshooting tips and debugging strategies that you might find helpful if you're having trouble building or running your Dataflow pipeline. This information can help you detect a pipeline failure, determine the reason behind a failed pipeline run, and suggest some courses of action to correct the problem.

Dataflow provides real-time feedback on your job, and there is a basic set of steps you can use to check the error messages, logs, and for conditions such as your job's progress having stalled.

For guidance on common errors you might encounter when running your Dataflow job, see the Common error guidance page.

Guidelines for temporary and staged files

The following are the best practices for Java and Python pipelines.

Java

  • For batch jobs, we recommend that you set a time to live (TTL) for the temporary location.

  • Before setting up TTL and as a general best practice, ensure that you set both the staging location and the temporary location to different locations.

  • Do not delete the objects in the staging location as these objects are reused.

  • If a job completes or is stopped and the temporary objects are not cleaned up, manually remove these files from the Cloud Storage bucket that is used as a temporary location.

Python

Both the temporary and staging locations have a prefix of <job_name>.<time>.

  • Ensure that you set both the staging location and the temporary location to different locations.

  • If required, delete the objects in the staging location after a job completes or stops. Also, staged objects are not reused in Python pipelines.

  • If a job ends and the temporary objects are not cleaned up, manually remove these files from the Cloud Storage bucket that is used as a temporary location.

  • For batch jobs, we recommend that you set a time to live (TTL) for both the temporary and the staging locations.

Checking your pipeline's status

You can detect any errors in your pipeline runs by using the Dataflow monitoring interface.

  1. Go to the Google Cloud Console.
  2. Select your Google Cloud project from the project list.
  3. Click the menu in the upper left corner.
  4. Navigate to the Big Data section and click Dataflow. A list of running jobs appears in the right-hand pane.
  5. Select the pipeline job you want to view. You can see the jobs' status at a glance in the Status field: "Running," "Succeeded," or "Failed."
Figure 1: A list of Dataflow jobs in the Developers Console with jobs in the running, succeeded, and failed states.

Basic troubleshooting workflow

If one of your pipeline jobs fails, you can select the job to view more detailed information on errors and run results. When you select a job, you can view key charts for your pipeline, the execution graph, the Job info panel, and the Logs panel with Job logs, Worker logs, and Diagnostics tabs.

Checking job error messages

To view the Job Logs generated by your pipeline code and the Dataflow service, click on the bottom Logs panel.

You can filter the messages that appear in Job logs by clicking Info and Filter. To only display error messages, click Info and select Error.

To expand an error message, click the expandable section .

The logs panel with Job Logs, Diagnostics, log level filter, and error
message expansion highlighted.

Alternatively, you can click the Diagnostics tab. This tab shows where errors occurred along the chosen timeline, a count of all logged errors, and possible recommendations for your pipeline.

Diagnostics tab with two reported
errors.

Viewing step logs for your job

When you select a step in your pipeline graph, the logs panel toggles from displaying Job Logs generated by the Dataflow service to showing logs from the Compute Engine instances running your pipeline step.

A selected pipeline step with the step worker logs highlighted.

Cloud Logging combines all of the collected logs from your project's Compute Engine instances in one location. See Logging pipeline messages for more information on using Dataflow's various logging capabilities.

Handling automated pipeline rejection

In some cases, the Dataflow service identifies that your pipeline might trigger known SDK issues. To prevent pipelines that will likely encounter issues, from being submitted, Dataflow will automatically reject your pipeline and display the following message:

The workflow was automatically rejected by the service because it may trigger an
identified bug in the SDK (details below). If you think this identification is
in error, and would like to override this automated rejection, please re-submit
this workflow with the following override flag: [OVERRIDE FLAG].
Bug details: [BUG DETAILS].
Contact Google Cloud Support for further help.
Please use this identifier in your communication: [BUG ID].

After reading the caveats in the linked bug details, if you want to try to run your pipeline anyway, you can override the automated rejection. Add the flag --experiments=<override-flag> and resubmit your pipeline.

Determining the cause of a pipeline failure

Typically, a failed Apache Beam pipeline run can be attributed to one of the following causes:

  • Graph or pipeline construction errors. These errors occur when Dataflow runs into a problem building the graph of steps that compose your pipeline, as described by your Apache Beam pipeline.
  • Errors in job validation. The Dataflow service validates any pipeline job you launch. Errors in the validation process can prevent your job from being successfully created or executed. Validation errors can include problems with your Google Cloud project's Cloud Storage bucket, or with your project's permissions.
  • Exceptions in worker code. These errors occur when there are errors or bugs in the user-provided code that Dataflow distributes to parallel workers, such as the DoFn instances of a ParDo transform.
  • Slow-running pipelines or lack of output. If your pipeline runs slowly or runs for a long period of time without reporting results, you might check your quotas for streaming data sources and sinks such as Pub/Sub. There are also certain transforms that are better-suited to high-volume streaming pipelines than others.
  • Errors caused by transient failures in other Google Cloud services. Your pipeline may fail because of a temporary outage or other problem in the Google Cloud services upon which Dataflow depends, such as Compute Engine or Cloud Storage.

Detecting graph or pipeline construction errors

A graph construction error can occur when Dataflow is building the execution graph for your pipeline from the code in your Dataflow program. During graph construction time, Dataflow checks for illegal operations.

If Dataflow detects an error in graph construction, keep in mind that no job will be created on the Dataflow service. Thus, you won't see any feedback in the Dataflow monitoring interface. Instead, you'll see an error message similar to the following in the console or terminal window where you ran your Apache Beam pipeline:

Java: SDK 2.x

For example, if your pipeline attempts to perform an aggregation like GroupByKey on a globally windowed, non-triggered, unbounded PCollection, you'll see an error message similar to the following:

...
... Exception in thread "main" java.lang.IllegalStateException:
... GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger.
... Use a Window.into or Window.triggering transform prior to GroupByKey
...

Python

For example, if your pipeline uses type hints and the argument type in one of the transforms is not as expected, you'll see an error message similar to the following:

... in <module> run()
... in run | beam.Map('count', lambda (word, ones): (word, sum(ones))))
... in __or__ return self.pipeline.apply(ptransform, self)
... in apply transform.type_check_inputs(pvalueish)
... in type_check_inputs self.type_check_inputs_or_outputs(pvalueish, 'input')
... in type_check_inputs_or_outputs pvalue_.element_type))
google.cloud.dataflow.typehints.decorators.TypeCheckError: Input type hint violation at group: expected Tuple[TypeVariable[K], TypeVariable[V]], got <type 'str'>

Java: SDK 1.x

Should you encounter such an error, check your pipeline code to ensure that your pipeline's operations are legal.

Detecting errors in Dataflow job validation

Once the Dataflow service has received your pipeline's graph, the service will attempt to validate your job. This validation includes the following:

  • Making sure the service can access your job's associated Cloud Storage buckets for file staging and temporary output.
  • Checking for the required permissions in your Google Cloud project.
  • Making sure the service can access input and output sources, such as files.

If your job fails the validation process, you'll see an error message in the Dataflow monitoring interface, as well as in your console or terminal window if you are using blocking execution. The error message will look similar to the following:

Java: SDK 2.x

INFO: To access the Dataflow monitoring console, please navigate to
  https://console.developers.google.com/project/google.com%3Aclouddfe/dataflow/job/2016-03-08_18_59_25-16868399470801620798
Submitted job: 2016-03-08_18_59_25-16868399470801620798
...
... Starting 3 workers...
... Executing operation BigQuery-Read+AnonymousParDo+BigQuery-Write
... Executing BigQuery import job "dataflow_job_16868399470801619475".
... Stopping worker pool...
... Workflow failed. Causes: ...BigQuery-Read+AnonymousParDo+BigQuery-Write failed.
Causes: ... BigQuery getting table "non_existent_table" from dataset "cws_demo" in project "my_project" failed.
Message: Not found: Table x:cws_demo.non_existent_table HTTP Code: 404
... Worker pool stopped.
... com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner run
INFO: Job finished with status FAILED
Exception in thread "main" com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException:
  Job 2016-03-08_18_59_25-16868399470801620798 failed with status FAILED
    at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:155)
    at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:56)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
    at com.google.cloud.dataflow.integration.BigQueryCopyTableExample.main(BigQueryCopyTableExample.java:74)

Python

INFO:root:Created job with id: [2016-03-08_14_12_01-2117248033993412477]
... Checking required Cloud APIs are enabled.
... Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_RUNNING.
... Combiner lifting skipped for step group: GroupByKey not followed by a combiner.
... Expanding GroupByKey operations into optimizable parts.
... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
... Annotating graph with Autotuner information.
... Fusing adjacent ParDo, Read, Write, and Flatten operations
... Fusing consumer split into read
...
... Starting 1 workers...
...
... Executing operation read+split+pair_with_one+group/Reify+group/Write
... Executing failure step failure14
... Workflow failed.
Causes: ... read+split+pair_with_one+group/Reify+group/Write failed.
Causes: ... Unable to view metadata for files: gs://dataflow-samples/shakespeare/missing.txt.
... Cleaning up.
... Tearing down pending resources...
INFO:root:Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_FAILED.

Java: SDK 1.x

Detecting an exception in worker code

While your job is running, you may encounter errors or exceptions in your worker code. These errors generally mean that the DoFns in your pipeline code have generated unhandled exceptions, which result in failed tasks in your Dataflow job.

Exceptions in user code (for example, your DoFn instances) are reported in the Dataflow monitoring interface. If you run your pipeline with blocking execution, you'll also see error messages printed in your console or terminal window, such as the following:

Java: SDK 2.x

INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/example_project/dataflow/job/2017-05-23_14_02_46-1117850763061203461
Submitted job: 2017-05-23_14_02_46-1117850763061203461
...
... To cancel the job using the 'gcloud' tool, run: gcloud beta dataflow jobs --project=example_project cancel 2017-05-23_14_02_46-1117850763061203461
... Autoscaling is enabled for job 2017-05-23_14_02_46-1117850763061203461.
... The number of workers will be between 1 and 15.
... Autoscaling was automatically enabled for job 2017-05-23_14_02_46-1117850763061203461.
...
... Executing operation BigQueryIO.Write/BatchLoads/Create/Read(CreateSource)+BigQueryIO.Write/BatchLoads/GetTempFilePrefix+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/ParDo(UseWindowHashAsKeyAndWindowAsSortKey)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Reify+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Write+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/BatchViewOverrides.GroupByKeyAndSortValuesOnly/Write
... Workers have started successfully.
...
... org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process SEVERE: 2017-05-23T21:06:33.711Z: (c14bab21d699a182): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ArithmeticException: / by zero
        at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:146)
        at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:104)
        at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowAndCombineFn.closeWindow(BatchGroupAlsoByWindowAndCombineFn.java:191)
...
... Cleaning up.
... Stopping worker pool...
... Worker pool stopped.

Note: The Dataflow service retries failed tasks up to 4 times in batch mode, and an unlimited number of times in streaming mode. In batch mode, your job will fail; in streaming, it may stall indefinitely.

Python

INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
...
INFO:root:... Expanding GroupByKey operations into optimizable parts.
INFO:root:... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
INFO:root:... Annotating graph with Autotuner information.
INFO:root:... Fusing adjacent ParDo, Read, Write, and Flatten operations
...
INFO:root:...: Starting 1 workers...
INFO:root:...: Executing operation group/Create
INFO:root:...: Value "group/Session" materialized.
INFO:root:...: Executing operation read+split+pair_with_one+group/Reify+group/Write
INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
INFO:root:...: ...: Workers have started successfully.
INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
INFO:root:...: Traceback (most recent call last):
  File ".../dataflow_worker/batchworker.py", line 384, in do_work self.current_executor.execute(work_item.map_task)
  ...
  File ".../apache_beam/examples/wordcount.runfiles/py/apache_beam/examples/wordcount.py", line 73, in <lambda>
ValueError: invalid literal for int() with base 10: 'www'

Note: The Dataflow service retries failed tasks up to 4 times in batch mode, and an unlimited number of times in streaming mode. In batch mode, your job will fail; in streaming, it may stall indefinitely.

Java: SDK 1.x

Consider guarding against errors in your code by adding exception handlers. For example, if you'd like to drop elements that fail some custom input validation done in a ParDo, handle the exception within your DoFn and drop the element.

You can also track failing elements in a few different ways:

  • You can log the failing elements and check the output using Cloud Logging.
  • You can check the Dataflow worker and worker startup logs for warnings or errors by following the instructions in Viewing logs.
  • You can have your ParDo write the failing elements to an additional output for later inspection.

To track the properties of an executing pipeline, you can use the Metrics class, as shown in the following example:

Java: SDK 2.x

final Counter counter = Metrics.counter("stats", "even-items");
PCollection<Integer> input = pipeline.apply(...);
...
input.apply(ParDo.of(new DoFn<Integer, Integer>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    if (c.element() % 2 == 0) {
      counter.inc();
    }
});

Python

class FilterTextFn(beam.DoFn):
      """A DoFn that filters for a specific key based on a regex."""

      def __init__(self, pattern):
        self.pattern = pattern
        # A custom metric can track values in your pipeline as it runs. Create
        # custom metrics to count unmatched words, and know the distribution of
        # word lengths in the input PCollection.
        self.word_len_dist = Metrics.distribution(self.__class__,
                                                  'word_len_dist')
        self.unmatched_words = Metrics.counter(self.__class__,
                                               'unmatched_words')

      def process(self, element):
        word = element
        self.word_len_dist.update(len(word))
        if re.match(self.pattern, word):
          yield element
        else:
          self.unmatched_words.inc()

    filtered_words = (
        words | 'FilterText' >> beam.ParDo(FilterTextFn('s.*')))

Java: SDK 1.x

Troubleshooting slow-running pipelines or lack of output

Java: SDK 2.x

If you have a high-volume streaming pipeline that is running slowly or stalled, there are a few things you can check:

Pub/Sub Quota

If your pipeline reads input from Pub/Sub, your Google Cloud project may have insufficient Pub/Sub quota. One indication of an insufficient quota is if your job is generating a high number of 429 (Rate Limit Exceeded) errors. Try the following steps to check for such errors:

  1. Go to the Google Cloud Console.
  2. In the left-hand navigation pane, click APIs & services.
  3. In the Search Box, search for Pub/Sub.
  4. Click the Usage tab.
  5. Check Response Codes and look for (4xx) client error codes.

Use .withFanout In Your Combine Transforms

If your pipeline processes high-volume unbounded PCollections, we recommend:

  • Use Combine.Globally.withFanout instead of Combine.Globally.
  • Use Combine.PerKey.withHotKeyFanout instead of Count.PerKey.

Python

If you have a high-volume streaming pipeline that is running slowly or stalled, there are a few things you can check:

Pub/Sub Quota

If your pipeline reads input from Pub/Sub, your Google Cloud project may have insufficient Pub/Sub quota. One indication of an insufficient quota is if your job is generating a high number of 429 (Rate Limit Exceeded) errors. Try the following steps to check for such errors:

  1. Go to the Google Cloud Console.
  2. In the left-hand navigation pane, click APIs & services.
  3. In the Search Box, search for Pub/Sub.
  4. Click the Usage tab.
  5. Check Response Codes and look for (4xx) client error codes.

Java: SDK 1.x

Using execution details

If your job is slow or stuck, use the Execution details tab.

This feature lets you inspect the execution of your batch jobs. You can use it to identify the stage or worker that's causing a bottleneck. For more information, read Execution details.

Identifying slow or stuck stages

To identify slow or stuck stages, use the Stage workflow view. Longer bars indicate that the stage has taken more time, so this view lets you quickly identify the slowest stages in your pipeline.

Once you find the bottleneck stage, you can take the following actions:

  • Identify the lagging worker within that stage.
  • If there are no lagging workers, identify which step is contributing the most to the runtime of the stage. To determine the slowest steps, use the Side info panel. You can then identify candidates for user code optimization.

Identifying a lagging worker

To identify a lagging worker for a specific stage, use the Worker progress view.

You can see if all workers are processing work until the end of the stage or if a single worker is stuck on a lagging task. Once this worker is found, you can take the following actions:

Common errors and courses of action

For guidance on common errors you might encounter when running your Dataflow job, see the Common error guidance page.