Frequently asked questions

Stay organized with collections Save and categorize content based on your preferences.

The following section contains answers to some frequently asked questions about Dataflow.

General questions

Where can I find additional support?

You can visit Google Cloud Support to obtain a support package for Google Cloud, including Dataflow.

You can use StackOverflow to research your question or to submit a new question. When submitting, please tag your question with google-cloud-dataflow. This group is monitored by members of Google's engineering staff who are happy to answer your questions.

You can also submit product questions, feature requests, bug or defect reports, and other feedback using Google Issue Tracker.

Is it possible to share data across pipeline instances?

There is no Dataflow-specific cross pipeline communication mechanism for sharing data or processing context between pipelines. You can use durable storage like Cloud Storage or an in-memory cache like App Engine to share data between pipeline instances.

Is there a built-in scheduling mechanism to execute pipelines at given time or interval?

You can automate pipeline execution by:

How can I tell what version of the Dataflow SDK is installed/running in my environment?

Installation details depend on your development environment. If you're using Maven, you can have multiple versions of the Dataflow SDK "installed," in one or more local Maven repositories.

Java

To find out what version of the Dataflow SDK that a given pipeline is running, you can look at the console output when running with DataflowPipelineRunner or BlockingDataflowPipelineRunner. The console will contain a message like the following, which contains the Dataflow SDK version information:

Python

To find out what version of the Dataflow SDK that a given pipeline is running, you can look at the console output when running with DataflowRunner. The console will contain a message like the following, which contains the Dataflow SDK version information:

Go

To find out what version of the Dataflow SDK that a given pipeline is running, you can look at the console output when running with DataflowRunner. The console will contain a message like the following, which contains the Dataflow SDK version information:

  INFO: Executing pipeline on the Dataflow Service, ...
  Dataflow SDK version: <version>

How many instances of DoFn should I expect Dataflow to spin up?

See the DoFn memory usage section of the Troubleshoot Dataflow out of memory errors page.

Interacting with your Cloud Dataflow job

Is it possible to access my job's worker machines (Compute Engine VMs) while my pipeline is running?

You can view the VM instances for a given pipeline by using the Google Cloud console. From there, you can use SSH to access each instance. However, once your job either completes or fails, the Dataflow service will automatically shut down and clean up the VM instances.

In the Cloud Dataflow Monitoring Interface, why don't I see Reserved CPU Time for my streaming job?

The Dataflow service reports Reserved CPU Time after jobs are completed. For unbounded jobs, this means Reserved CPU time is only reported after jobs have been cancelled or have failed.

In the Cloud Dataflow Monitoring Interface, why are the job state and watermark information unavailable for recently updated streaming jobs?

The Update operation makes several changes that take a few minutes to propagate to the Dataflow Monitoring Interface. Try refreshing the monitoring interface 5 minutes after updating your job.

Why do my custom composite transforms appear expanded in the Dataflow Monitoring Interface?

In your pipeline code, you might have invoked your composite transform as follows:

result = transform.apply(input);

Composite transforms invoked in this manner omit the expected nesting and may thus appear expanded in the Dataflow Monitoring Interface. Your pipeline may also generate warnings or errors about stable unique names at pipeline execution time.

To avoid these issues, make sure you invoke your transforms using the recommended format:

result = input.apply(transform);

Why can't I see my ongoing job's information anymore in the Cloud Dataflow Monitoring Interface, even though it appeared previously?

There is a known issue that currently can affect some Dataflow jobs that have been running for one month or longer. Such jobs might fail to load in the Dataflow Monitoring Interface, or they might show outdated information, even if the job was previously visible.

You can still obtain your job's status in the job list when using the Dataflow Monitoring or Dataflow Command-line Interfaces. However, if this issue is present, you won't be able to view details about your job.

Programming with the Apache Beam SDK for Java

Can I pass additional (out-of-band) data into an existing ParDo operation?

Yes. There are several patterns to follow, depending on your use case:

  • You can serialize information as fields in your DoFn subclass.
  • Any variables referenced by the methods in an anonymous DoFn will be automatically serialized.
  • You can compute data inside DoFn.startBundle().
  • You can pass in data via ParDo.withSideInputs.

For more information, see the ParDo documentation, specifically the sections on Creating a DoFn and Side Inputs, as well as the API for Java reference documentation for ParDo.

Programming with the Apache Beam SDK for Python

How do I handle NameErrors?

If you're getting a NameError when you execute your pipeline using the Dataflow service but not when you execute locally (i.e. using the DirectRunner), your DoFns may be using values in the global namespace that are not available on the Dataflow worker.

By default, global imports, functions, and variables defined in the main session are not saved during the serialization of a Dataflow job. If, for example, your DoFns are defined in the main file and reference imports and functions in the global namespace, you can set the --save_main_session pipeline option to True. This will cause the state of the global namespace to be pickled and loaded on the Dataflow worker.

Notice that if you have objects in your global namespace that cannot be pickled, you will get a pickling error. If the error is regarding a module that should be available in the Python distribution, you can solve this by importing the module locally, where it is used.

For example, instead of:

import re
…
def myfunc():
  # use re module

use:

def myfunc():
  import re
  # use re module

Alternatively, if your DoFns span multiple files, you should use a different approach to packaging your workflow and managing dependencies.

Pipeline I/O

Does the TextIO source and sink support compressed files, such as GZip?

Yes. Dataflow Java can read files compressed with gzip and bzip2. See the TextIO documentation for additional information.

Can I use a regular expression to target specific files with the TextIO source?

Dataflow supports general wildcard patterns; your glob expression can appear anywhere in the file path. However, Dataflow does not support recursive wildcards (**).

Does the TextIO input source support JSON?

Yes. However, for the Dataflow service to be able to parallelize input and output, your source data must be delimited with a line feed.

Why isn't dynamic work rebalancing activating with my custom source?

Dynamic work rebalancing uses the return value of your custom source's getProgress() method to activate. The default implementation for getProgress() returns null. To ensure auto-scaling activates, make sure your custom source overrides getProgress() to return an appropriate value.

How do I access BigQuery datasets or Pub/Sub topics or subscriptions owned by a different Google Cloud Platform project (i.e., not the project with which I'm using Cloud Dataflow)?

See Dataflow's Security and Permissions guide for information on how to access BigQuery or Pub/Sub data in a different Google Cloud project than the one with which you're using Dataflow.

Streaming

How do I run my pipeline in streaming mode?

You can set the --streaming flag at the command line when you execute your pipeline. You can also set the streaming mode programmatically when you construct your pipeline.

What data sources and sinks are supported in streaming mode?

You can read streaming data from Pub/Sub, and you can write streaming data to Pub/Sub or BigQuery.

What are the current limitations of streaming mode?

Batch sources are not yet supported in streaming mode.

Why isn't my streaming job upscaling properly when I Update my pipeline with a larger pool of workers?

Java

For streaming jobs that do not use Streaming Engine, you cannot scale beyond the original number of workers and Persistent Disk resources allocated at the start of your original job. When you update a Dataflow job and specify a larger number of workers in the new job, you can only specify a number of workers equal to the --maxNumWorkers that you specified for your original job.

Python

For streaming jobs that do not use Streaming Engine, you cannot scale beyond the original number of workers and Persistent Disk resources allocated at the start of your original job. When you update a Dataflow job and specify a larger number of workers in the new job, you can only specify a number of workers equal to the --max_num_workers that you specified for your original job.

Go

For streaming jobs that do not use Streaming Engine, you cannot scale beyond the original number of workers and Persistent Disk resources allocated at the start of your original job. When you update a Dataflow job and specify a larger number of workers in the new job, you can only specify a number of workers equal to the --max_num_workers that you specified for your original job.

Streaming autoscaling

See Troubleshoot Dataflow autoscaling.

Setting up your Google Cloud Platform project to use Cloud Dataflow

How do I determine whether the project I'm using with Cloud Dataflow owns a Cloud Storage bucket that I want to read from or write to?

To determine whether your Google Cloud project owns a particular Cloud Storage bucket, you can use the following console command:

gsutil acl get gs://<your-bucket>

The command outputs a JSON string similar to the following:

[
  {
    "entity": "project-owners-123456789",
    "projectTeam": {
      "projectNumber": "123456789",
      "team": "owners"
    },
    "role": "OWNER"
  },
  ....
]

The relevant entries are the ones for which the "role" is owner. The associated projectNumber tells you which project owns that bucket. If the project number doesn't match your project’s number, you will need to either:

  • Create a new bucket that is owned by your project.
  • Give the appropriate accounts access to the bucket.

How do I create a new bucket owned by my Cloud Dataflow project?

To create a new bucket in the Google Cloud project in which you're using Dataflow, you can use the following console command:

gsutil mb -p <Project to own the bucket> <bucket-name>

How do I make a bucket owned by a different project readable or writable for the Google Cloud Platform project I'm using with Cloud Dataflow?

See Dataflow's Security and Permissions guide for information on how your Dataflow pipeline can access Google Cloud resources owned by a different Google Cloud project.