This document describes how to run a Dataflow pipeline using a custom container.
For information about creating the container image, see Build custom container images for Dataflow.
When you run your pipeline, launch the pipeline using the Apache Beam SDK with the same version and language version as the SDK on your custom container image. This step avoids unexpected errors from incompatible dependencies or SDKs.
Test locally
Before you run your pipeline in Dataflow, it's a good idea to test the container image locally, which allows for more rapid testing and debugging.
To learn more about Apache Beam-specific usage, see the Apache Beam guide for Running pipelines with custom container images.
Basic testing with PortableRunner
To verify that remote container images can be pulled and can run a simple
pipeline, use the Apache Beam PortableRunner
. When you use the
PortableRunner
, job submission occurs in the local environment, and the
DoFn
execution happens in the Docker environment.
When you use GPUs, the Docker container might not have access to the GPUs. To test your container with GPUs, use the direct runner and follow the steps for testing a container image on a standalone VM with GPUs in the Debug with a standalone VM section of the "Use GPUs" page.
The following runs an example pipeline:
Java
mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
-Dexec.args="--runner=PortableRunner \
--jobEndpoint=REGION \
--defaultEnvironmentType=DOCKER \
--defaultEnvironmentConfig=IMAGE_URI \
--inputFile=INPUT_FILE \
--output=OUTPUT_FILE"
Python
python path/to/my/pipeline.py \
--runner=PortableRunner \
--job_endpoint=REGION \
--environment_type=DOCKER \
--environment_config=IMAGE_URI \
--input=INPUT_FILE \
--output=OUTPUT_FILE
Go
go path/to/my/pipeline.go \
--runner=PortableRunner \
--job_endpoint=REGION \
--environment_type=DOCKER \
--environment_config=IMAGE_URI \
--input=INPUT_FILE \
--output=OUTPUT_FILE
Replace the following:
REGION
: the job service region to use, in the form of address and port. For example:localhost:3000
. Useembed
to run an in-process job service.IMAGE_URI
: the custom container image URI.INPUT_FILE
: an input file that can be read as a text file. This file must be accessible by the SDK harness
container image, either preloaded on the container image or a remote file.OUTPUT_FILE
: a path to write output to. This path is either a remote path or a local path on the container.
When the pipeline successfully completes, review the console logs to verify that
the pipeline completed successfully and that the remote image, specified by
IMAGE_URI
, is used.
After running the pipeline, files saved to the container are not in your local
file system, and the container is stopped. You can copy files from
the stopped container file system by using
docker cp
.
Alternatively:
- Provide outputs to a remote file system like Cloud Storage. You might need to manually configure access for testing purposes, including for credential files or Application Default Credentials.
- For quick debugging, add temporary logging.
Use the Direct Runner
For more in-depth local testing of the container image and your pipeline, use the Apache Beam Direct Runner.
You can verify your pipeline separately from the container by testing in a local environment matching the container image, or by launching the pipeline on a running container.
Java
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...
Python
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# python path/to/my/pipeline.py ...
Go
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# go path/to/my/pipeline.go ...
Replace IMAGE_URI
with the custom container image URI.
The examples assume any pipeline files, including the pipeline itself, are on
the custom container, have been mounted from a local file system, or are
remote and accessible by Apache Beam and the container. For example, to use
Maven (mvn
) to run the previous Java example, Maven and its
dependencies must be staged on the container. For more information, see
Storage and
docker run
in the Docker documentation.
The goal for testing on the Direct Runner is to test your pipeline
in the custom container environment, not to test running your container
with its default ENTRYPOINT
. Modify the ENTRYPOINT
(for example,
docker run --entrypoint ...
) to either directly run your pipeline or to allow
manually running commands on the container.
If you rely on a specific configuration that is based on running the container on Compute Engine, you can run the container directly on a Compute Engine VM. For more information, see Containers on Compute Engine.
Launch the Dataflow job
When launching the Apache Beam pipeline on Dataflow, specify
the path to the container image. Don't use the :latest
tag with your custom
images. Tag your builds with a date or a unique identifier. If something goes
wrong, using this type of tag might make it possible to revert the pipeline
execution to a previously known working configuration and allow for an
inspection of changes.
Java
Use --sdkContainerImage
to specify an SDK container image for your Java runtime.
Use --experiments=use_runner_v2
to enable Runner v2.
Python
If using SDK version 2.30.0 or later, use the pipeline option --sdk_container_image
to specify an SDK container image.
For earlier versions of the SDK, use the pipeline option --worker_harness_container_image
to specify the location of container image to use for the worker harness.
Custom containers are only supported for Dataflow Runner v2. If
you're launching a batch Python pipeline, set the --experiments=use_runner_v2
flag.
If you're launching a streaming Python pipeline, specifying the experiment isn't
necessary, because streaming Python pipelines use Runner v2 by default.
Go
If using SDK version 2.40.0 or later, use the pipeline option --sdk_container_image
to specify an SDK container image.
For earlier versions of the SDK, use the pipeline option --worker_harness_container_image
to specify the location of container image to use for the worker harness.
Custom containers are supported on all versions of the Go SDK because they use Dataflow Runner v2 by default.
The following example demonstrates how to launch the batch
WordCount
example
with a custom container.
Java
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=DataflowRunner \
--inputFile=INPUT_FILE \
--output=OUTPUT_FILE \
--project=PROJECT_ID \
--region=REGION \
--gcpTempLocation=TEMP_LOCATION \
--diskSizeGb=DISK_SIZE_GB \
--experiments=use_runner_v2 \
--sdkContainerImage=IMAGE_URI"
Python
Using the Apache Beam SDK for Python version 2.30.0 or later:
python -m apache_beam.examples.wordcount \
--input=INPUT_FILE \
--output=OUTPUT_FILE \
--project=PROJECT_ID \
--region=REGION \
--temp_location=TEMP_LOCATION \
--runner=DataflowRunner \
--disk_size_gb=DISK_SIZE_GB \
--experiments=use_runner_v2 \
--sdk_container_image=IMAGE_URI
Go
wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://<your-gcs-bucket>/counts \
--runner dataflow \
--project your-gcp-project \
--region your-gcp-region \
--temp_location gs://<your-gcs-bucket>/tmp/ \
--staging_location gs://<your-gcs-bucket>/binaries/ \
--sdk_container_image=IMAGE_URI
Replace the following:
INPUT_FILE
: the Cloud Storage input path read by Dataflow when running the example.OUTPUT_FILE
: the Cloud Storage output path written to by the example pipeline. This file contains the word counts.PROJECT_ID
: the ID of your Google Cloud project.REGION
: the region to deploy your Dataflow job in.TEMP_LOCATION
: the Cloud Storage path for Dataflow to stage temporary job files created during the execution of the pipeline.DISK_SIZE_GB
: Optional. If your container is large, consider increasing default boot disk size to avoid running out of disk space.IMAGE_URI
: the SDK custom container image URI. Always use a versioned container SHA or tag. Don't use the:latest
tag or a mutable tag.