Frequently Asked Questions

The following section contains answers to some frequently asked questions about Cloud Dataflow.

General Questions

Where can I find additional support?

You can visit Google Cloud Platform (GCP) Support to obtain a support package for GCP, including Cloud Dataflow.

You can use StackOverflow to research your question or to submit a new question. When submitting, please tag your question with google-cloud-dataflow. This group is monitored by members of Google's engineering staff who are happy to answer your questions.

You can also submit questions, feature requests, bug or defect reports, and other feedback on the UserVoice forum.

Is it possible to share data across pipeline instances?

There is no Cloud Dataflow-specific cross pipeline communication mechanism for sharing data or processing context between pipelines. You can use durable storage like Cloud Storage or an in-memory cache like Memcached to share data between pipeline instances.

Is there a built-in scheduling mechanism to execute pipelines at given time or interval?

How can I tell what version of the Cloud Dataflow SDK is installed/running in my environment?

Installation details depend on your development environment. If you're using Maven, you can have multiple versions of the Cloud Dataflow SDK "installed," in one or more local Maven repositories.

Java

To find out what version of the Cloud Dataflow SDK that a given pipeline is running, you can look at the console output when running with DataflowPipelineRunner or BlockingDataflowPipelineRunner. The console will contain a message like the following, which contains the Cloud Dataflow SDK version information:

Python

To find out what version of the Cloud Dataflow SDK that a given pipeline is running, you can look at the console output when running with DataflowRunner. The console will contain a message like the following, which contains the Cloud Dataflow SDK version information:

  INFO: Executing pipeline on the Dataflow Service, ...
  Dataflow SDK version: <version>

Interacting with Your Cloud Dataflow Job

Is it possible to access my job's worker machines (Compute Engine VMs) while my pipeline is running?

You can view the VM instances for a given pipeline by using the Google Cloud Platform Console. From there, you can use SSH to access each instance. However, once your job either completes or fails, the Cloud Dataflow service will automatically shut down and clean up the VM instances.

In the Cloud Dataflow Monitoring Interface, why don't I see Reserved CPU Time for my streaming job?

The Cloud Dataflow service reports Reserved CPU Time after jobs are completed. For unbounded jobs, this means Reserved CPU time is only reported after jobs have been cancelled or have failed.

In the Cloud Dataflow Monitoring Interface, why are the job state and watermark information unavailable for recently updated streaming jobs?

The Update operation makes several changes that take a few minutes to propagate to the Cloud Dataflow Monitoring Interface. Try refreshing the monitoring interface 5 minutes after updating your job.

Why do my custom composite transforms appear expanded in the Cloud Dataflow Monitoring Interface?

In your pipeline code, you might have invoked your composite transform as follows:

  result = transform.apply(input);
  

Composite transforms invoked in this manner omit the expected nesting and may thus appear expanded in the Cloud Dataflow Monitoring Interface. Your pipeline may also generate warnings or errors about stable unique names at pipeline execution time.

To avoid these issues, make sure you invoke your transforms using the recommended format:

  result = input.apply(transform);
  

Why can't I see my ongoing job's information anymore in the Cloud Dataflow Monitoring Interface, even though it appeared previously?

There is a known issue that currently can affect some Cloud Dataflow jobs that have been running for one month or longer. Such jobs might fail to load in the Cloud Dataflow Monitoring Interface, or they might show outdated information, even if the job was previously visible.

You can still obtain your job's status in the job list when using the Cloud Dataflow Monitoring or Cloud Dataflow Command-line Interfaces. However, if this issue is present, you won't be able to view details about your job.

Programming with the Cloud Dataflow SDK for Java

Can I pass additional (out-of-band) data into an existing ParDo operation?

Yes. There are several patterns to follow, depending on your use case:

  • You can serialize information as fields in your DoFn subclass.
  • Any variables referenced by the methods in an anonymous DoFn will be automatically serialized.
  • You can compute data inside DoFn.startBundle().
  • You can pass in data via ParDo.withSideInputs.

For more information, see the ParDo documentation, specifically the sections on Creating a DoFn and Side Inputs, as well as the API for Java reference documentation for ParDo.

How are Java exceptions handled in Cloud Dataflow?

Your pipeline may throw exceptions while processing data. Some of these errors are transient (e.g., temporary difficulty accessing an external service), but some are permanent, such as errors caused by corrupt or unparseable input data, or null pointers during computation.

Cloud Dataflow processes elements in arbitrary bundles, and will retry the complete bundle when an error is thrown for any element in that bundle. When running in batch mode, bundles including a failing item are retried 4 times. The pipeline will fail completely when a single bundle has failed 4 times. When running in streaming mode, a bundle including a failing item will be retried indefinitely, which may cause your pipeline to permanently stall.

Exceptions in user code (for example, your DoFn instances) are reported in the Cloud Dataflow Monitoring Interface. If you run your pipeline with BlockingDataflowPipelineRunner, you'll also see error messages printed in your console or terminal window.

Consider guarding against errors in your code by adding exception handlers. For example, if you'd like to drop elements that fail some custom input validation done in a ParDo, use a try/catch block within your ParDo to handle the exception and drop the element. You may also want to use an Aggregator to keep track of error counts.

Programming with the Cloud Dataflow SDK for Python

How do I handle NameErrors?

If you're getting a NameError when you execute your pipeline using the Cloud Dataflow service but not when you execute locally (i.e. using the DirectRunner), your DoFns may be using values in the global namespace that are not available on the Cloud Dataflow worker.

By default, global imports, functions, and variables defined in the main session are not saved during the serialization of a Cloud Dataflow job. If, for example, your DoFns are defined in the main file and reference imports and functions in the global namespace, you can set the --save_main_session pipeline option to True. This will cause the state of the global namespace to be pickled and loaded on the Cloud Dataflow worker.

Notice that if you have objects in your global namespace that cannot be pickled, you will get a pickling error. If the error is regarding a module that should be available in the Python distribution, you can solve this by importing the module locally, where it is used.

For example, instead of:

import re
…
def myfunc():
  # use re module

use:

def myfunc():
  import re
  # use re module

Alternatively, if your DoFns span multiple files, you should use a different approach to packaging your workflow and managing dependencies.

Pipeline I/O

Does the TextIO source and sink support compressed files, such as GZip?

Yes. Cloud Dataflow Java can read files compressed with gzip and bzip2. See the TextIO documentation for additional information.

Can I use a regular expression to target specific files with the TextIO source?

Cloud Dataflow supports general wildcard patterns; your glob expression can appear anywhere in the file path. However, Cloud Dataflow does not support recursive wildcards (**).

Does the TextIO input source support JSON?

Yes. However, for the Cloud Dataflow service to be able to parallelize input and output, your source data must be delimited with a line feed.

Why isn't dynamic work rebalancing activating with my custom source?

Dynamic work rebalancing uses the return value of your custom source's getProgress() method to activate. The default implementation for getProgress() returns null. To ensure auto-scaling activates, make sure your custom source overrides getProgress() to return an appropriate value.

How do I access BigQuery datasets or Pub/Sub topics or subscriptions owned by a different Google Cloud Platform project (i.e., not the project with which I'm using Cloud Dataflow)?

See Cloud Dataflow's Security and Permissions guide for information on how to access BigQuery or Cloud Pub/Sub data in a different GCP project than the one with which you're using Cloud Dataflow.

Streaming

How do I run my pipeline in streaming mode?

You can set the --streaming flag at the command line when you execute your pipeline. You can also set the streaming mode programmatically when you construct your pipeline.

What data sources and sinks are supported in streaming mode?

You can read streaming data from Cloud Pub/Sub, and you can write streaming data to Cloud Pub/Sub or BigQuery.

What are the current limitations of streaming mode?

Cloud Dataflow's streaming mode has the following limitations:

  • Batch sources are not yet supported in streaming mode.
  • The Cloud Dataflow service's Automatic Scaling features are supported in beta.

It looks like my streaming pipeline that reads from Pub/Sub is slowing down. What can I do?

Your project may have insufficient Cloud Pub/Sub quota. You can find out if your project has insufficient quota by checking for 429 (Rate limit exceeded) client errors:

  1. Go to the Google Cloud Platform Console.
  2. In the menu on the left, select APIs & services.
  3. In the Search Box, search for Google Cloud Pub/Sub.
  4. Click the Usage tab.
  5. Check Response Codes and look for (4xx) client error codes.

Why isn't my streaming job upscaling properly when I Update my pipeline with a larger pool of workers?

When you update a Cloud Dataflow job and specify a larger number of workers in the new job, you can only specify a number of workers equal to the --maxNumWorkers that you specified for your original job. You cannot scale beyond the original number of workers (and thus Persistent Disk resources) allocated at the start of your original job.

This is a known issue with the Cloud Dataflow Service and is actively being investigated.

Streaming Autoscaling

Note: The Cloud Dataflow SDK for Python does not currently support streaming autoscaling.

What should I do if I want a fixed number of workers?

To enable streaming autoscaling, you need to opt in; it's not on by default. The semantics of the current options are not changing, so to keep using a fixed number of workers, you don't need to do anything.

I’m worried autoscaling will increase my bill. How can I limit it?

By specifying --maxNumWorkers, you limit the scaling range that will be used to process your job.

What is the scaling range for streaming autoscaling pipelines?

The number of workers used for a streaming autoscaling pipeline ranges between N/15 and N workers where N is the value of --maxNumWorkers. For example, if your pipeline needs 3 or 4 workers in steady state, you could set --maxNumWorkers=15 and the pipeline will automatically scale between 1 and 15 workers.

--maxNumWorkers can be 1000 at most.

What’s the maximum number of workers autoscaling might use?

Cloud Dataflow operates within the limits of your project's Compute Engine instance count quota or maxNumWorkers, whichever is lower.

Can I turn off autoscaling on my streaming pipeline?

Yes. Set --autoscalingAlgorithm=NONE. Update the pipeline with fixed cluster specifications (as described in the manual scaling documentation) where numWorkers is within the scaling range.

Can I change the scaling range on my streaming pipeline?

Yes, but you cannot do this with Update. You’ll need to stop your pipeline by using Cancel or Drain and redeploy your pipeline with the new desired maxNumWorkers.

Setting Up Your Google Cloud Platform Project to Use Cloud Dataflow

How do I determine whether the project I'm using with Cloud Dataflow owns a Cloud Storage bucket that I want to read from or write to?

To determine whether your GCP project owns a particular Cloud Storage bucket, you can use the following console command:

gsutil acl get gs://<your-bucket>

The command outputs a JSON string similar to the following:

[
  {
    "entity": "project-owners-123456789",
    "projectTeam": {
      "projectNumber": "123456789",
      "team": "owners"
    },
    "role": "OWNER"
  },
  ....
]

The relevant entries are the ones for which the "role" is owner. The associated projectNumber tells you which project owns that bucket. If the project number doesn't match your project’s number, you will need to either:

How do I create a new bucket owned by my Cloud Dataflow project?

To create a new bucket in the GCP project in which you're using Cloud Dataflow, you can use the following console command:

gsutil mb -p <Project to own the bucket> <bucket-name>

How do I make a bucket owned by a different project readable or writable for the Google Cloud Platform project I'm using with Cloud Dataflow?

See Cloud Dataflow's Security and Permissions guide for information on how your Cloud Dataflow pipeline can access GCP resources owned by a different GCP project.

When I try to run my Cloud Dataflow job, I see an error that says "Some Cloud APIs need to be enabled for your project in order for Cloud Dataflow to run this job." What should I do?

To run a Cloud Dataflow job, you must enable the following GCP APIs in your project:

  • Compute Engine API (Compute Engine)
  • Cloud Logging API
  • Cloud Storage
  • Cloud Storage JSON API
  • BigQuery API
  • Cloud Pub/Sub
  • Cloud Datastore API

See the Getting Started section on enabling GCP APIs for detailed instructions.

Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Dataflow Documentation