The following section contains answers to some frequently asked questions about Dataflow.
General questions
Where can I find additional support?
You can visit Google Cloud Support to obtain a support package for Google Cloud, including Dataflow.
You can use StackOverflow to research your question or to submit a new question. When submitting, please tag your question with google-cloud-dataflow. This group is monitored by members of Google's engineering staff who are happy to answer your questions.
You can also submit questions, feature requests, bug or defect reports, and other feedback on the UserVoice forum.
Is it possible to share data across pipeline instances?
There is no Dataflow-specific cross pipeline communication mechanism for sharing data or processing context between pipelines. You can use durable storage like Cloud Storage or an in-memory cache like App Engine to share data between pipeline instances.
Is there a built-in scheduling mechanism to execute pipelines at given time or interval?
You can automate pipeline execution by:
- Using Cloud Scheduler
- Using Apache Airflow's Dataflow Operator, one of several Google Cloud Operators in a Cloud Composer workflow.
- Running custom (cron) job processes on Compute Engine.
How can I tell what version of the Dataflow SDK is installed/running in my environment?
Installation details depend on your development environment. If you're using Maven, you can have multiple versions of the Dataflow SDK "installed," in one or more local Maven repositories.
Java
To find out what version of the Dataflow SDK that a given pipeline is running, you can look at
the console output when running with DataflowPipelineRunner
or
BlockingDataflowPipelineRunner
. The console will contain a message like
the following, which contains the Dataflow SDK version information:
Python
To find out what version of the Dataflow SDK that a given pipeline is running, you can look at
the console output when running with DataflowRunner
. The console will contain a message like
the following, which contains the Dataflow SDK version information:
INFO: Executing pipeline on the Dataflow Service, ... Dataflow SDK version: <version>
Interacting with your Cloud Dataflow job
Is it possible to access my job's worker machines (Compute Engine VMs) while my pipeline is running?
You can view the VM instances for a given pipeline by using the Google Cloud Console. From there, you can use SSH to access each instance. However, once your job either completes or fails, the Dataflow service will automatically shut down and clean up the VM instances.
In the Cloud Dataflow Monitoring Interface, why don't I see Reserved CPU Time for my streaming job?
The Dataflow service reports Reserved CPU Time after jobs are completed. For unbounded jobs, this means Reserved CPU time is only reported after jobs have been cancelled or have failed.
In the Cloud Dataflow Monitoring Interface, why are the job state and watermark information unavailable for recently updated streaming jobs?
The Update operation makes several changes that take a few minutes to propagate to the Dataflow Monitoring Interface. Try refreshing the monitoring interface 5 minutes after updating your job.
Why do my custom composite transforms appear expanded in the Dataflow Monitoring Interface?
In your pipeline code, you might have invoked your composite transform as follows:
result = transform.apply(input);
Composite transforms invoked in this manner omit the expected nesting and may thus appear expanded in the Dataflow Monitoring Interface. Your pipeline may also generate warnings or errors about stable unique names at pipeline execution time.
To avoid these issues, make sure you invoke your transforms using the recommended format:
result = input.apply(transform);
Why can't I see my ongoing job's information anymore in the Cloud Dataflow Monitoring Interface, even though it appeared previously?
There is a known issue that currently can affect some Dataflow jobs that have been running for one month or longer. Such jobs might fail to load in the Dataflow Monitoring Interface, or they might show outdated information, even if the job was previously visible.
You can still obtain your job's status in the job list when using the Dataflow Monitoring or Dataflow Command-line Interfaces. However, if this issue is present, you won't be able to view details about your job.
Programming with the Apache Beam SDK for Java
Can I pass additional (out-of-band) data into an existing ParDo operation?
Yes. There are several patterns to follow, depending on your use case:
- You can serialize information as fields in your
DoFn
subclass. - Any variables referenced by the methods in an anonymous
DoFn
will be automatically serialized. - You can compute data inside
DoFn.startBundle()
. - You can pass in data via
ParDo.withSideInputs
.
For more information, see the ParDo documentation, specifically the sections on Creating a DoFn and Side Inputs, as well as the API for Java reference documentation for ParDo.
How are Java exceptions handled in Cloud Dataflow?
Your pipeline may throw exceptions while processing data. Some of these errors are transient (e.g., temporary difficulty accessing an external service), but some are permanent, such as errors caused by corrupt or unparseable input data, or null pointers during computation.
Dataflow processes elements in arbitrary bundles, and will retry the complete bundle when an error is thrown for any element in that bundle. When running in batch mode, bundles including a failing item are retried 4 times. The pipeline will fail completely when a single bundle has failed 4 times. When running in streaming mode, a bundle including a failing item will be retried indefinitely, which may cause your pipeline to permanently stall.
Exceptions in user code (for example, your DoFn
instances) are
reported in the
Dataflow Monitoring Interface.
If you run your pipeline with BlockingDataflowPipelineRunner
, you'll also see
error messages printed in your console or terminal window.
Consider guarding against errors in your code by adding exception handlers. For
example, if you'd like to drop elements that fail some custom input validation
done in a ParDo
, use a try/catch block within your ParDo
to handle the
exception and drop the element. You may also want to use an
Aggregator
to keep track of error counts.
Programming with the Cloud Dataflow SDK for Python
How do I handle NameError
s?
If you're getting a NameError
when you execute your pipeline using the Dataflow
service but not when you execute locally (i.e. using the DirectRunner
), your
DoFn
s may be using values in the global namespace that are not available on the
Dataflow worker.
By default, global imports, functions, and variables defined in the main session
are not saved during the serialization of a Dataflow job. If, for
example, your DoFn
s are defined in the main file and reference imports and
functions in the global namespace, you can set the --save_main_session
pipeline option to True
. This will cause the state of the global namespace to
be pickled and loaded on the Dataflow worker.
Notice that if you have objects in your global namespace that cannot be pickled, you will get a pickling error. If the error is regarding a module that should be available in the Python distribution, you can solve this by importing the module locally, where it is used.
For example, instead of:
import re … def myfunc(): # use re module
use:
def myfunc(): import re # use re module
Alternatively, if your DoFn
s span multiple files, you should use
a different approach to packaging your workflow and
managing dependencies.
Pipeline I/O
Does the TextIO source and sink support compressed files, such as GZip?
Yes. Dataflow Java can read files compressed with gzip
and
bzip2
. See the TextIO documentation for additional
information.
Can I use a regular expression to target specific files with the TextIO source?
Dataflow supports general wildcard patterns; your glob expression
can appear anywhere in the file path. However, Dataflow does not
support recursive wildcards (**
).
Does the TextIO input source support JSON?
Yes. However, for the Dataflow service to be able to parallelize input and output, your source data must be delimited with a line feed.
Why isn't dynamic work rebalancing activating with my custom source?
Dynamic work rebalancing uses the return value of your custom source's getProgress()
method to activate. The default implementation for getProgress()
returns
null
. To ensure auto-scaling activates, make sure your custom source overrides
getProgress()
to return an appropriate value.
How do I access BigQuery datasets or Pub/Sub topics or subscriptions owned by a different Google Cloud Platform project (i.e., not the project with which I'm using Cloud Dataflow)?
See Dataflow's Security and Permissions guide for information on how to access BigQuery or Pub/Sub data in a different Google Cloud project than the one with which you're using Dataflow.
Why do I get "rateLimitExceeded" errors when using the BigQuery connector and what should I do about them?
BigQuery has short term quota limits that apply when too many API requests are sent during a short duration. It's possible for your Dataflow pipeline to temporarily exceed such a quota. Whenever this happens, API requests from your Dataflow pipeline to BigQuery might fail, which could result in rateLimitExceeded
errors in worker logs. Note that Dataflow retries such failures, so you can safely ignore these errors. If you believe that your pipeline is significantly impacted due to rateLimitExceeded
errors, please contact Google Cloud Support.
I'm using the BigQuery connector to write to BigQuery using streaming inserts and my write throughput is lower than expected. What can I do to remedy this?
Slow throughput might be due to your pipeline exceeding the available BigQuery streaming insert quota. If this is the case, you should see quota related error messages from BigQuery in the Dataflow worker logs (look for quotaExceeded
errors). If you see such errors, consider setting the BigQuery sink option ignoreInsertIds()
when using the Apache Beam SDK for Java or using the ignore_insert_ids
option when using the Apache Beam SDK for Python to become automatically eligible for a one GB/sec per-project BigQuery streaming insert throughput. For more information on caveats related to automatic message de-duplication, see the BigQuery documentation. To increase the BigQuery streaming insert quota above one GB/s, submit a request through the Cloud Console.
If you do not see quota related errors in worker logs, the issue might be that default bundling or batching related parameters do not provide adequate parallelism for your pipeline to scale. There are several Dataflow BigQuery connector related configurations that you can consider adjusting to achieve the expected performance when writing to BigQuery using streaming inserts. For example, for Apache Beam SDK for Java, adjust numStreamingKeys
to match the maximum number of workers and consider increasing insertBundleParallelism
to configure BigQuery connector to write to BigQuery using more parallel threads. For configurations available in the Apache Beam SDK for Java, see BigQueryPipelineOptions, and for configurations available in the Apache Beam SDK for Python, see the WriteToBigQuery transform.
Streaming
How do I run my pipeline in streaming mode?
You can set the --streaming
flag at the
command line
when you execute your pipeline. You can also set the streaming mode
programmatically
when you construct your pipeline.
What data sources and sinks are supported in streaming mode?
You can read streaming data from Pub/Sub, and you can write streaming data to Pub/Sub or BigQuery.
What are the current limitations of streaming mode?
Dataflow's streaming mode has the following limitations:
- Batch sources are not yet supported in streaming mode.
- The Dataflow service's Automatic Scaling features are supported in beta.
It looks like my streaming pipeline that reads from Pub/Sub is slowing down. What can I do?
Your project may have insufficient Pub/Sub quota.
You can find out if your project has insufficient quota by checking for
429 (Rate limit exceeded)
client errors:
- Go to the Google Cloud Console.
- In the menu on the left, select APIs & services.
- In the Search Box, search for Cloud Pub/Sub.
- Click the Usage tab.
- Check Response Codes and look for
(4xx)
client error codes.
Why isn't my streaming job upscaling properly when I Update my pipeline with a larger pool of workers?
Java
For streaming jobs that do not use
Streaming Engine,
you cannot scale beyond the original number of workers and Persistent Disk
resources allocated at the start of your original job. When you
update a
Dataflow job and specify a larger number of workers in the new
job, you can only specify a number of workers equal to the --maxNumWorkers
that you specified for your original job.
Python
For streaming jobs that do not use
Streaming Engine,
you cannot scale beyond the original number of workers and Persistent Disk
resources allocated at the start of your original job. When you
update a
Dataflow job and specify a larger number of workers in the new
job, you can only specify a number of workers equal to the --max_num_workers
that you specified for your original job.
Streaming autoscaling
What should I do if I want a fixed number of workers?
To enable streaming autoscaling, you need to opt in; it's not on by default. The semantics of the current options are not changing, so to keep using a fixed number of workers, you don't need to do anything.
I’m worried autoscaling will increase my bill. How can I limit it?
Java
By specifying --maxNumWorkers
, you limit the scaling range used to process
your job.
Python
By specifying --max_num_workers
, you limit the scaling range used to process
your job.
What is the scaling range for streaming autoscaling pipelines?
Java
For streaming autoscaling jobs that do not use
Streaming Engine, the
Dataflow service allocates between 1 to 15 Persistent Disks to
each worker. This means that the minimum number of
workers used for a streaming autoscaling pipeline is N/15, where N is the value
of --maxNumWorkers
.
For streaming autoscaling jobs that use Streaming Engine, the minimum number of workers is 1.
Dataflow balances the number of Persistent Disks between the
workers. For example, if your pipeline needs 3 or 4 workers in steady
state, you could set --maxNumWorkers=15
. The pipeline automatically
scales between 1 and 15 workers, using 1, 2, 3, 4, 5, 8, or 15 workers, which
corresponds to 15, 8, 5, 4, 3, 2, or 1 Persistent Disks per worker,
respectively.
--maxNumWorkers
can be 1000 at most.
Python
For streaming autoscaling jobs that do not use
Streaming Engine, the
Dataflow service allocates between 1 to 15 Persistent Disks to
each worker. This means that the minimum number of
workers used for a streaming autoscaling pipeline is N/15, where N is the value
of --max_num_workers
.
For streaming autoscaling jobs that use Streaming Engine, the minimum number of workers is 1.
Dataflow balances the number of Persistent Disks between the
workers. For example, if your pipeline needs 3 or 4 workers in steady
state, you could set --max_num_workers=15
. The pipeline automatically
scales between 1 and 15 workers, using 1, 2, 3, 4, 5, 8, or 15 workers, which
corresponds to 15, 8, 5, 4, 3, 2, or 1 Persistent Disks per worker,
respectively.
--max_num_workers
can be 1000 at most.
What’s the maximum number of workers autoscaling might use?
Java
Dataflow operates within the limits of your project's
Compute Engine instance count quota or maxNumWorkers
, whichever is
lower.
Python
Dataflow operates within the limits of your project's
Compute Engine instance count quota or max_num_workers
, whichever is
lower.
Can I turn off autoscaling on my streaming pipeline?
Java
Yes. Set --autoscalingAlgorithm=NONE
. Update the pipeline with fixed cluster
specifications, as described in the
manual scaling documentation,
where numWorkers
is within the scaling range.
Python
Yes. Set --autoscaling_algorithm=NONE
. Update the pipeline with fixed cluster
specifications, as described in the
manual scaling documentation,
where num_workers
is within the scaling range.
Can I change the scaling range on my streaming pipeline?
Java
Yes, but you cannot do this with Update*. You must
stop your pipeline by using
Cancel
or Drain and redeploy
your pipeline with the new desired maxNumWorkers
.
Python
Yes, but you cannot do this with Update*. You must
stop your pipeline by using
Cancel
or Drain and redeploy
your pipeline with the new desired max_num_workers
.
Setting up your Google Cloud Platform project to use Cloud Dataflow
How do I determine whether the project I'm using with Cloud Dataflow owns a Cloud Storage bucket that I want to read from or write to?
To determine whether your Google Cloud project owns a particular Cloud Storage bucket, you can use the following console command:
gsutil acl get gs://<your-bucket>
The command outputs a JSON string similar to the following:
[ { "entity": "project-owners-123456789", "projectTeam": { "projectNumber": "123456789", "team": "owners" }, "role": "OWNER" }, .... ]
The relevant entries are the ones for which the "role" is owner. The associated
projectNumber
tells you which project owns that bucket. If the project number doesn't
match your project’s number, you will need to either:
- Create a new bucket that is owned by your project.
- Give the appropriate accounts access to the bucket.
How do I create a new bucket owned by my Cloud Dataflow project?
To create a new bucket in the Google Cloud project in which you're using Dataflow, you can use the following console command:
gsutil mb -p <Project to own the bucket> <bucket-name>
How do I make a bucket owned by a different project readable or writable for the Google Cloud Platform project I'm using with Cloud Dataflow?
See Dataflow's Security and Permissions guide for information on how your Dataflow pipeline can access Google Cloud resources owned by a different Google Cloud project.
When I try to run my Cloud Dataflow job, I see an error that says "Some Cloud APIs need to be enabled for your project in order for Cloud Dataflow to run this job." What should I do?
To run a Dataflow job, you must enable the following Google Cloud APIs in your project:
- Compute Engine API (Compute Engine)
- Cloud Logging API
- Cloud Storage
- Cloud Storage JSON API
- BigQuery API
- Pub/Sub
- Datastore API
See the Getting Started section on enabling Google Cloud APIs for detailed instructions.