Frequently Asked Questions

The following section contains answers to some frequently asked questions about Google Cloud Dataflow.

General Questions

Where can I find additional support?

You can visit Google Cloud Platform Support to obtain a support package for Google Cloud Platform, including Cloud Dataflow.

You can use StackOverflow to research your question or to submit a new question. When submitting, please tag your question with google-cloud-dataflow. This group is monitored by members of Google's engineering staff who are happy to answer your questions.

You can also submit questions, feature requests, bug or defect reports, and other feedback on the UserVoice forum.

Is it possible to share data across pipeline instances?

There is no Dataflow-specific cross pipeline communication mechanism for sharing data or processing context between pipelines. You can use durable storage like Cloud Storage or an in-memory cache like Memcached to share data between pipeline instances.

Is there a built-in scheduling mechanism to execute pipelines at given time or interval?

You can automate pipeline execution by using Google App Engine (Flexible Environment only) or Cloud Functions. You can also use custom (cron) job processes on Google Compute Engine.

How can I tell what version of the Dataflow SDK is installed/running in my environment?

Installation details depend on your development environment. If you're using Maven, you can have multiple versions of the Dataflow SDK "installed," in one or more local Maven repositories.

Java

To find out what version of the Dataflow SDK that a given pipeline is running, you can look at the console output when running with DataflowPipelineRunner or BlockingDataflowPipelineRunner. The console will contain a message like the following, which contains the Dataflow SDK version information:

Python

To find out what version of the Dataflow SDK that a given pipeline is running, you can look at the console output when running with DataflowRunner. The console will contain a message like the following, which contains the Dataflow SDK version information:

  INFO: Executing pipeline on the Dataflow Service, ...
  Dataflow SDK version: <version>

Interacting with Your Dataflow Job

Is it possible to access my job's worker machines (Compute Engine VMs) while my pipeline is running?

You can view the VM instances for a given pipeline by using the Google Cloud Platform Console. From there, you can use SSH to access each instance. However, once your job either completes or fails, the Dataflow service will automatically shut down and clean up the VM instances.

In the Dataflow Monitoring Interface, why don't I see Reserved CPU Time for my streaming job?

The Dataflow service reports Reserved CPU Time after jobs are completed. For unbounded jobs, this means Reserved CPU time is only reported after jobs have been cancelled or have failed.

In the Dataflow Monitoring Interface, why are the job state and watermark information unavailable for recently updated streaming jobs?

The Update operation makes several changes that take a few minutes to propagate to the Dataflow Monitoring Interface. Try refreshing the monitoring interface 5 minutes after updating your job.

Why do my custom composite transforms appear expanded in the Dataflow Monitoring Interface?

In your pipeline code, you might have invoked your composite transform as follows:

  result = transform.apply(input);
  

Composite transforms invoked in this manner omit the expected nesting and may thus appear expanded in the Dataflow Monitoring Interface. Your pipeline may also generate warnings or errors about stable unique names at pipeline execution time.

To avoid these issues, make sure you invoke your transforms using the recommended format:

  result = input.apply(transform);
  

Why can't I see my ongoing job's information anymore in the Dataflow Monitoring Interface, even though it appeared previously?

There is a known issue that currently can affect some Dataflow jobs that have been running for one month or longer. Such jobs might fail to load in the Dataflow Monitoring Interface, or they might show outdated information, even if the job was previously visible.

You can still obtain your job's status in the job list when using the Dataflow Monitoring or Dataflow Command-line Interfaces. However, if this issue is present, you won't be able to view details about your job.

Programming with the Dataflow SDK for Java

Can I pass additional (out-of-band) data into an existing ParDo operation?

Yes. There are several patterns to follow, depending on your use case:

  • You can serialize information as fields in your DoFn subclass.
  • Any variables referenced by the methods in an anonymous DoFn will be automatically serialized.
  • You can compute data inside DoFn.startBundle().
  • You can pass in data via ParDo.withSideInputs.

For more information, see the ParDo documentation, specifically the sections on Creating a DoFn and Side Inputs, as well as the API for Java reference documentation for ParDo.

How are Java exceptions handled in Dataflow?

Your pipeline may throw exceptions while processing data. Some of these errors are transient (e.g., temporary difficulty accessing an external service), but some are permanent, such as errors caused by corrupt or unparseable input data, or null pointers during computation.

Dataflow processes elements in arbitrary bundles, and will retry the complete bundle when an error is thrown for any element in that bundle. When running in batch mode, bundles including a failing item are retried 4 times. The pipeline will fail completely when a single bundle has failed 4 times. When running in streaming mode, a bundle including a failing item will be retried indefinitely, which may cause your pipeline to permanently stall.

Exceptions in user code (for example, your DoFn instances) are reported in the Dataflow Monitoring Interface. If you run your pipeline with BlockingDataflowPipelineRunner, you'll also see error messages printed in your console or terminal window.

Consider guarding against errors in your code by adding exception handlers. For example, if you'd like to drop elements that fail some custom input validation done in a ParDo, use a try/catch block within your ParDo to handle the exception and drop the element. You may also want to use an Aggregator to keep track of error counts.

Programming with the Dataflow SDK for Python

How do I handle NameErrors?

If you're getting a NameError when you execute your pipeline using the Dataflow service but not when you execute locally (i.e. using the DirectRunner), your DoFns may be using values in the global namespace that are not available on the Dataflow worker.

By default, global imports, functions, and variables defined in the main session are not saved during the serialization of a Dataflow job. If, for example, your DoFns are defined in the main file and reference imports and functions in the global namespace, you can set the --save_main_session pipeline option to True. This will cause the state of the global namespace to be pickled and loaded on the Dataflow worker.

Notice that if you have objects in your global namespace that cannot be pickled, you will get a pickling error. If the error is regarding a module that should be available in the Python distribution, you can solve this by importing the module locally, where it is used.

For example, instead of:

import re
…
def myfunc():
  # use re module

use:

def myfunc():
  import re
  # use re module

Alternatively, if your DoFns span multiple files, you should use a different approach to packaging your workflow and managing dependencies.

Pipeline I/O

Does the TextIO source and sink support compressed files, such as GZip?

Yes. Dataflow Java can read files compressed with gzip and bzip2. See the TextIO documentation for additional information.

Can I use a regular expression to target specific files with the TextIO source?

Dataflow supports general wildcard patterns; your glob expression can appear anywhere in the file path. However, Dataflow does not support recursive wildcards (**).

Does the TextIO input source support JSON?

Yes. However, for the Dataflow service to be able to parallelize input and output, your source data must be delimited with a line feed.

Why isn't dynamic work rebalancing activating with my custom source?

Dynamic work rebalancing uses the return value of your custom source's getProgress() method to activate. The default implementation for getProgress() returns null. To ensure auto-scaling activates, make sure your custom source overrides getProgress() to return an appropriate value.

How do I access BigQuery datasets or Pub/Sub topics or subscriptions owned by a different Cloud Platform project (i.e., not the project with which I'm using Dataflow)?

See Dataflow's Security and Permissions guide for information on how to access BigQuery or Pub/Sub data in a different Cloud Platform project than the one with which you're using Dataflow.

Streaming

How do I run my pipeline in streaming mode?

You can set the --streaming flag at the command line when you execute your pipeline. You can also set streaming mode programmatically when you construct your pipeline. When setting your pipeline configuration options, invoke the method setStreaming(true) on your DataflowPipelineOptions object.

What data sources and sinks are supported in streaming mode?

You can read streaming data from Google Cloud Pub/Sub by using Dataflow’s PubsubIO.Read API. You can write streaming data to Pub/Sub or BigQuery, using the PubsubIO.Write and BigqueryIO.Write APIs, respectively.

What are the current limitations of streaming mode?

Dataflow's streaming mode has the following limitations:

  • Batch sources are not yet supported in streaming mode.
  • The Dataflow service's Automatic Scaling features are not yet supported in streaming mode.

It looks like my streaming pipeline that reads from Pub/Sub is slowing down. What can I do?

Your project may have insufficient Pub/Sub quota. You can find out if your project has insufficient quota by checking for 429 (Rate limit exceeded) client errors:

  1. Go to the Google Cloud Platform Console.
  2. In the menu on the left, select APIs & services.
  3. In the Search Box, search for Google Cloud Pub/Sub.
  4. Click the Usage tab.
  5. Check Response Codes and look for (4xx) client error codes.

Why isn't my streaming job upscaling properly when I Update my pipeline with a larger pool of workers?

When you update a Dataflow job and specify a larger number of workers in the new job, you can only specify a number of workers equal to the --maxNumWorkers that you specified for your original job. You cannot scale beyond the original number of workers (and thus Persistent Disk resources) allocated at the start of your original job.

This is a known issue with the Dataflow Service and is actively being investigated.

Streaming Autoscaling

What should I do if I want a fixed number of workers?

To enable streaming autoscaling, you need to opt in; it's not on by default. The semantics of the current options are not changing, so to keep using a fixed number of workers, you don't need to do anything.

I’m worried autoscaling will increase my bill. How can I limit it?

By specifying --maxNumWorkers, you limit the scaling range that will be used to process your job.

What is the scaling range for streaming autoscaling pipelines?

The number of workers used for a streaming autoscaling pipeline ranges between N/15 and N workers where N is the value of --maxNumWorkers. For example, if your pipeline needs 3 or 4 workers in steady state, you could set --maxNumWorkers=15 and the pipeline will automatically scale between 1 and 15 workers.

--maxNumWorkers can be 1000 at most.

What’s the maximum number of workers autoscaling might use?

Dataflow operates within the limits of your project's Google Compute Engine instance count quota or maxNumWorkers, whichever is lower.

Can I turn off autoscaling on my streaming pipeline?

Yes. Set --autoscalingAlgorithm=NONE. Update the pipeline with fixed cluster specifications (as described in the manual scaling documentation) where numWorkers is within the scaling range.

Can I change the scaling range on my streaming pipeline?

Yes, but you cannot do this with Update. You’ll need to stop your pipeline by using Cancel or Drain and redeploy your pipeline with the new desired maxNumWorkers.

Setting Up Your Cloud Platform Project to Use Dataflow

How do I determine whether the project I'm using with Dataflow owns a Cloud Storage bucket that I want to read from or write to?

To determine whether your Cloud Platform project owns a particular Cloud Storage bucket, you can use the following console command:

gsutil acl get gs://<your-bucket>

The command outputs a JSON string similar to the following:

[
  {
    "entity": "project-owners-123456789",
    "projectTeam": {
      "projectNumber": "123456789",
      "team": "owners"
    },
    "role": "OWNER"
  },
  ....
]

The relevant entries are the ones for which the "role" is owner. The associated projectNumber tells you which project owns that bucket. If the project number doesn't match your project’s number, you will need to either:

How do I create a new bucket owned by my Cloud Dataflow project?

To create a new bucket in the Cloud Platform project in which you're using Cloud Dataflow, you can use the following console command:

gsutil mb -p <Project to own the bucket> <bucket-name>

How do I make a bucket owned by a different project readable or writable for the Cloud Platform project I'm using with Cloud Dataflow?

See Dataflow's Security and Permissions guide for information on how your Dataflow pipeline can access Cloud Platform resources owned by a different Cloud Platform project.

When I try to run my Dataflow job, I see an error that says "Some Cloud APIs need to be enabled for your project in order for Dataflow to run this job." What should I do?

To run a Dataflow job, you must enable the following Cloud Platform APIs in your project:

  • Compute Engine API (Google Compute Engine)
  • Google Cloud Logging API
  • Google Cloud Storage
  • Google Cloud Storage JSON API
  • BigQuery API
  • Google Cloud Pub/Sub
  • Google Cloud Datastore API

See the Getting Started section on enabling Cloud Platform APIs for detailed instructions.

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.

Send feedback about...

Cloud Dataflow Documentation