在自定义容器中运行 Dataflow 作业

本文档介绍如何使用自定义容器运行 Dataflow 流水线。

如需了解如何创建容器映像,请参阅为 Dataflow 构建自定义容器映像

运行流水线时,请使用与您的自定义容器映像上的 SDK 相同的 Apache Beam SDK 版本和语言版本来启动流水线。此步骤可以避免不兼容的依赖项或 SDK 出现意外错误。

在本地测试

在 Dataflow 中运行流水线之前,最好在本地测试容器映像,这样可以更快地测试和调试。

如需详细了解 Apache Beam 特定用法,请参阅有关使用自定义容器映像运行流水线的 Apache Beam 指南。

使用 PortableRunner 进行基本测试

如需验证远程容器映像是否可以拉取并且是否可以运行简单的流水线,请使用 Apache Beam PortableRunner。使用 PortableRunner 时,作业提交在本地环境中进行,并且 DoFn 在 Docker 环境中执行。

使用 GPU 时,Docker 容器可能无权访问 GPU。如需使用 GPU 测试容器,请使用 Direct Runner,并按照“使用 GPU”页面的通过独立虚拟机进行调试部分中有关使用 GPU 在独立虚拟机上测试容器映像的步骤操作。

下面运行了一个示例流水线:

Java

mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
    -Dexec.args="--runner=PortableRunner \
    --jobEndpoint=REGION \
    --defaultEnvironmentType=DOCKER \
    --defaultEnvironmentConfig=IMAGE_URI \
    --inputFile=INPUT_FILE \
    --output=OUTPUT_FILE"

Python

python path/to/my/pipeline.py \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Go

go path/to/my/pipeline.go \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

替换以下内容:

  • REGION:要使用的作业服务区域,采用地址和端口的形式。例如:localhost:3000。使用 embed 运行进程内作业服务。
  • IMAGE_URI:自定义容器映像 URI。
  • INPUT_FILE:可以作为文本文件读取的输入文件。此文件必须可供 SDK 自动化测试框架容器映像访问
    (已在容器映像上已预加载或是远程文件)。
  • OUTPUT_FILE:要写入输出的路径。此路径可以是远程路径或容器上的本地路径。

流水线成功完成后,查看控制台日志以验证流水线是否成功完成以及是否使用了由 IMAGE_URI 指定的远程映像

运行流水线后,保存到容器的文件不在本地文件系统中,并且容器会停止。您可以使用 docker cp 从已停止的容器文件系统中复制文件。

或者:

  • 向 Cloud Storage 等远程文件系统提供输出。您可能需要手动配置访问权限以进行测试,包括对凭据文件或应用默认凭据的访问权限。
  • 如需快速进行调试,请添加临时日志记录

使用 Direct Runner

如需对容器映像和流水线进行更深入的本地测试,请使用 Apache Beam Direct Runner

您可以独立于容器验证流水线,方法是在与容器映像匹配的本地环境中进行测试,或者在正在运行的容器上启动流水线。

Java

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...

Python

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  python path/to/my/pipeline.py ...

Go

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  go path/to/my/pipeline.go ...

IMAGE_URI 替换为自定义容器映像 URI。

此示例假定所有流水线文件(包括流水线本身)都位于自定义容器上、已从本地文件系统装载,或者是远程文件且可供 Apache Beam 和容器访问。例如,如需使用 Maven (mvn) 运行上述 Java 示例,您必须将 Maven 及其依赖项暂存在容器上。如需了解详情,请参阅 Docker 文档中的存储docker run

在 Direct Runner 上测试的目标是在自定义容器环境中测试流水线,而不是使用其默认的 ENTRYPOINT 测试运行容器。修改 ENTRYPOINT(例如 docker run --entrypoint ...)以直接运行流水线,或者允许在容器上手动运行命令。

如果您依赖的特定配置基于在 Compute Engine 上运行容器,则可以直接在 Compute Engine 虚拟机上运行容器。如需了解详情,请参阅 Compute Engine 上的容器

启动 Dataflow 作业

在 Dataflow 上启动 Apache Beam 流水线时,请指定容器映像的路径。请勿将 :latest 标记用于您的自定义映像。使用日期或唯一标识符标记构建。如果出现问题,则使用此类标记可以将流水线执行还原为之前已知的工作配置,并允许检查更改。

Java

使用 --sdkContainerImage 为您的 Java 运行时指定 SDK 容器映像。

使用 --experiments=use_runner_v2 启用 Runner v2。

Python

如果使用的是 SDK 版本 2.30.0 或更高版本,请使用流水线选项 --sdk_container_image 指定 SDK 容器映像。

对于旧版 SDK,请使用流水线选项 --worker_harness_container_image 指定要用于工作器自动化测试框架的容器映像的位置。

只有 Dataflow Runner v2 支持自定义容器。如果您要启动批处理 Python 流水线,请设置 --experiments=use_runner_v2 标志。如果您要启动流处理 Python 流水线,则无需指定实验,因为流处理 Python 流水线默认使用 Runner v2。

Go

如果使用的是 SDK 版本 2.40.0 或更高版本,请使用流水线选项 --sdk_container_image 指定 SDK 容器映像。

对于旧版 SDK,请使用流水线选项 --worker_harness_container_image 指定要用于工作器自动化测试框架的容器映像的位置。

所有版本的 Go SDK 都支持自定义容器,因为它们默认使用 Dataflow Runner v2。

以下示例演示了如何使用自定义容器启动批量 WordCount 示例

Java

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
   -Dexec.args="--runner=DataflowRunner \
                --inputFile=INPUT_FILE \
                --output=OUTPUT_FILE \
                --project=PROJECT_ID \
                --region=REGION \
                --gcpTempLocation=TEMP_LOCATION \
                --diskSizeGb=DISK_SIZE_GB \
                --experiments=use_runner_v2 \
                --sdkContainerImage=IMAGE_URI"

Python

使用 Python 版 Apache Beam SDK 2.30.0 或更高版本:

python -m apache_beam.examples.wordcount \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE \
  --project=PROJECT_ID \
  --region=REGION \
  --temp_location=TEMP_LOCATION \
  --runner=DataflowRunner \
  --disk_size_gb=DISK_SIZE_GB \
  --experiments=use_runner_v2 \
  --sdk_container_image=IMAGE_URI

Go

wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output gs://<your-gcs-bucket>/counts \
          --runner dataflow \
          --project your-gcp-project \
          --region your-gcp-region \
          --temp_location gs://<your-gcs-bucket>/tmp/ \
          --staging_location gs://<your-gcs-bucket>/binaries/ \
          --sdk_container_image=IMAGE_URI

替换以下内容:

  • INPUT_FILE:运行示例时,Dataflow 读取的 Cloud Storage 输入路径。
  • OUTPUT_FILE:示例流水线写入的 Cloud Storage 输出路径。此文件包含字数统计。
  • PROJECT_ID:您的 Google Cloud 项目的 ID。
  • REGION:要在其中部署 Dataflow 作业的区域。
  • TEMP_LOCATION:供 Dataflow 暂存流水线执行期间创建的临时作业文件的 Cloud Storage 路径。
  • DISK_SIZE_GB:可选。如果您的容器较大,请考虑增加默认启动磁盘大小,避免耗尽磁盘可用空间
  • IMAGE_URI:SDK 自定义容器映像 URI。 始终使用有版本控制的容器 SHA 或标记。请勿使用 :latest 标记或可变标记。