本页面记录了各种 Dataflow Flex 模板配置选项,包括:
如要配置示例 Flex 模板,请参阅 Flex 模板教程。
了解 Flex 模板权限
使用 Flex 模板时,您需要三组权限:
- 创建资源的权限
- 构建 Flex 模板的权限
- 运行 Flex 模板的权限
创建资源的权限
如需开发和运行 Flex 模板流水线,您需要创建各种资源(例如暂存存储桶)。对于一次性资源创建任务,您可以使用基本 Owner 角色。
构建 Flex 模板的权限
作为 Flex 模板的开发者,您需要构建模板,以将其提供给用户。构建涉及将模板规范上传到 Cloud Storage 存储桶,以及使用运行流水线所需的代码和依赖项预配 Docker 映像。如需构建 Flex 模板,您需要拥有 Cloud Storage 的读写权限以及对 Artifact Registry 代码库的 Artifact Registry Writer 权限。您可以通过分配以下角色来授予这些权限:
- Storage Admin (
roles/storage.admin
) - Cloud Build Editor (
roles/cloudbuild.builds.editor
) - Artifact Registry Writer (
roles/artifactregistry.writer
)
运行 Flex 模板的权限
当您运行 Flex 模板时,Dataflow 会为您创建一个作业。如需创建作业,Dataflow 服务账号需要以下权限:
dataflow.serviceAgent
首次使用 Dataflow 时,服务会为您分配此角色,因此您无需授予此权限。
默认情况下,Compute Engine 服务账号用于启动器虚拟机和工作器虚拟机。该服务账号需要以下角色和权限:
- Storage Object Admin (
roles/storage.objectAdmin
) - Viewer (
roles/viewer
) - Dataflow Worker (
roles/dataflow.worker
) - 暂存存储桶的读写权限
- Flex 模板映像的读取权限
如需授予暂存存储桶的读写权限,您可以使用 Storage Object Admin (roles/storage.objectAdmin
) 角色。如需了解详情,请参阅 Cloud Storage 的 IAM 角色。
如需授予 Flex 模板映像的读取权限,您可以使用 Storage Object Viewer (roles/storage.objectViewer
) 角色。如需了解详情,请参阅配置访问权限控制。
设置所需的 Dockerfile 环境变量
如果要为 Flex 模板作业创建自己的 Dockerfile,请指定以下环境变量:
Java
在 Dockerfile 中指定 FLEX_TEMPLATE_JAVA_MAIN_CLASS
和 FLEX_TEMPLATE_JAVA_CLASSPATH
。
ENV | 说明 | 必需 |
---|---|---|
FLEX_TEMPLATE_JAVA_MAIN_CLASS |
指定启动 Flex 模板要运行的 Java 类。 | 是 |
FLEX_TEMPLATE_JAVA_CLASSPATH |
指定类文件的位置。 | 是 |
FLEX_TEMPLATE_JAVA_OPTIONS |
指定启动 Flex 模板时要传递的 Java 选项。 | 否 |
Python
在 Dockerfile 中指定 FLEX_TEMPLATE_PYTHON_PY_FILE
。
如需管理流水线依赖项,请在 Dockerfile 中设置变量,如下所示:
FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
FLEX_TEMPLATE_PYTHON_PY_OPTIONS
FLEX_TEMPLATE_PYTHON_SETUP_FILE
FLEX_TEMPLATE_PYTHON_EXTRA_PACKAGES
例如,GitHub 中的 Streaming in Python Flex 模板教程中设置了以下环境变量:
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
ENV | 说明 | 必需 |
---|---|---|
FLEX_TEMPLATE_PYTHON_PY_FILE |
指定启动 Flex 模板要运行的 Python 文件。 | 是 |
FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE |
指定包含流水线依赖项的需求文件。 如需了解详情,请参阅 Apache Beam 文档中的 PyPI 依赖项。 | 否 |
FLEX_TEMPLATE_PYTHON_SETUP_FILE |
指定流水线软件包“setup.py”文件的路径。 如需了解详情,请参阅 Apache Beam 文档中的多个文件依赖项。 | 否 |
FLEX_TEMPLATE_PYTHON_EXTRA_PACKAGES |
指定不公开提供的软件包。如需了解如何使用额外的软件包,请参阅本地或非 PyPI 依赖项。 |
否 |
FLEX_TEMPLATE_PYTHON_PY_OPTIONS |
指定启动 Flex 模板时要传递的 Python 选项。 | 否 |
软件包依赖项
如果 Dataflow Python 流水线使用其他依赖项,您可能需要配置 Flex 模板,以便在 Dataflow 工作器虚拟机上安装其他依赖项。
如果在限制互联网访问的环境中运行使用 Flex 模板的 Python Dataflow 作业,则您必须在创建模板时预封装依赖项。
使用以下任一选项预封装 Python 依赖项。
如需了解如何在 Java 和 Go 流水线中管理流水线依赖项,请参阅在 Dataflow 中管理流水线依赖项。
使用需求文件并使用模板预封装依赖项
如果您使用自己的 Dockerfile 来定义 Flex 模板映像,请按以下步骤操作:
创建一个列出流水线依赖项的
requirements.txt
文件。COPY requirements.txt /template/ ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/template/requirements.txt"
在 Flex 模板映像中安装依赖项。
RUN pip install --no-cache-dir -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
将依赖项下载到本地要求缓存中,该缓存会在模板启动时暂存到 Dataflow 工作器。
RUN pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
使用此方法时,requirements.txt
文件中的依赖项会在运行时安装到 Dataflow 工作器。Google Cloud 控制台的“建议”标签页中的分析可能会记录此行为。如需避免在运行时安装依赖项,请使用自定义容器映像。
以下是使用 Flex 模板中的要求文件的代码示例。
将流水线结构设计为软件包并使用本地软件包
如果您使用多个 Python 本地文件或模块,请将流水线结构化为软件包。文件结构可能如下所示:
main.py
pyproject.toml
setup.py
src/
my_package/
my_custom_dofns_and_transforms.py
my_pipeline_launcher.py
other_utils_and_helpers.py
将顶级入口点(例如
main.py
文件)放置在根目录中。将其余文件放在src
目录中的单独文件夹中,例如my_package
。将软件包配置文件添加到根目录,并提供软件包详情和要求。
pyproject.toml
[project] name = "my_package" version = "package_version" dependencies = [ # Add list of packages (and versions) that my_package depends on. # Example: "apache-beam[gcp]==2.54.0", ]
setup.py
"""An optional setuptools configuration stub for the pipeline package. Use pyproject.toml to define the package. Add this file only if you must use the --setup_file pipeline option or the FLEX_TEMPLATE_PYTHON_SETUP_FILE configuration option. """ import setuptools setuptools.setup()
如需详细了解如何配置本地软件包,请参阅打包 Python 项目。
为流水线导入本地模块或文件时,请使用
my_package
软件包名称作为导入路径。from my_package import word_count_transform
在 Flex 模板映像中安装流水线软件包。您的 Flex 模板 Dockerfile 可能包含类似于以下示例的内容:
Dockerfile
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py" ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py" # Copy pipeline, packages and requirements. WORKDIR ${WORKDIR} COPY main.py . COPY pyproject.toml . COPY setup.py . COPY src src # Install local package. RUN pip install -e .
使用此方法时,requirements.txt
文件中的依赖项会在运行时安装到 Dataflow 工作器。Google Cloud 控制台的“建议”标签页中的分析可能会记录此行为。如需避免在运行时安装依赖项,请使用自定义容器映像。
如需查看遵循推荐方法的示例,请参阅 GitHub 中的具有依赖项和自定义容器映像的流水线 Flex 模板教程。
使用预安装所有依赖项的自定义容器
如需避免在运行时安装依赖项,请使用自定义容器。对于在不访问互联网的环境中运行的流水线,首选此选项。
如需使用自定义容器,请按以下步骤操作:
构建一个预安装必要依赖项的自定义容器映像。
在 Flex 模板 Dockerfile 中预安装相同的依赖项。
如需防止在运行时安装依赖项,请勿在 Flex 模板配置中使用
FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
或FLEX_TEMPLATE_PYTHON_SETUP_FILE
选项。修改后的 Flex 模板
Dockerfile
可能如以下示例所示:FROM gcr.io/dataflow-templates-base/python3-template-launcher-base ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/main.py" COPY . /template # If you use a requirements file, pre-install the requirements.txt. RUN pip install --no-cache-dir -r /template/requirements.txt # If you supply the pipeline in a package, pre-install the local package and its dependencies. RUN pip install -e /template
使用此方法时,您需要执行以下操作:
- 构建 Flex 模板映像
- 构建自定义 SDK 容器映像
- 在两个映像中安装相同的依赖项
或者,为了减少需要维护的映像数量,请将自定义容器映像用作 Flex 模板的基础映像。
如果您使用 Apache Beam SDK 2.49.0 版或更低版本,请在流水线启动器中添加
--sdk_location=container
流水线选项。此选项会指示流水线使用自定义容器中的 SDK,而不是下载 SDK。options = PipelineOptions(beam_args, save_main_session=True, streaming=True, sdk_location="container")
在
flex-template run
命令中设置sdk_container_image
参数。例如:gcloud dataflow flex-template run $JOB_NAME \ --region=$REGION \ --template-file-gcs-location=$TEMPLATE_PATH \ --parameters=sdk_container_image=$CUSTOM_CONTAINER_IMAGE \ --additional-experiments=use_runner_v2
如需了解详情,请参阅在 Dataflow 中使用自定义容器。
选择基础映像
您可以使用 Google 提供的基础映像来使用 Docker 打包模板容器映像。从 Flex 模板基础映像中选择最新标记。建议使用特定的图片代码(而非 latest
)。
请按以下格式指定基础映像:
gcr.io/dataflow-templates-base/IMAGE_NAME:TAG
请替换以下内容:
IMAGE_NAME
:Google 提供的基础映像TAG
:基础映像的版本名称,请参阅 Flex 模板基础映像参考
使用自定义容器映像
如果流水线使用自定义容器映像,我们建议将此自定义映像用作 Flex 模板 Docker 映像的基础映像。为此,请将 Google 提供的模板基础映像中的 Flex 模板启动器二进制文件复制到您的自定义映像。
映像的示例 Dockerfile
既可用作自定义 SDK 容器映像,也可用作 Flex 模板,如下所示:
FROM gcr.io/dataflow-templates-base/IMAGE_NAME:TAG as template_launcher
FROM apache/beam_python3.10_sdk:2.59.0
# RUN <...Make image customizations here...>
# See: https://cloud.google.com/dataflow/docs/guides/build-container-image
# Configure the Flex Template here.
COPY --from=template_launcher /opt/google/dataflow/python_template_launcher /opt/google/dataflow/python_template_launcher
COPY my_pipeline.py /template/
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/my_pipeline.py"
替换以下内容:
IMAGE_NAME
:Google 提供的基础映像。例如:python311-template-launcher-base
。TAG
:基础映像的版本标记,请参阅 Flex 模板基础映像参考文档为了提高稳定性和问题排查,请避免使用latest
。请改为固定到特定版本标记。
如需查看遵循此方法的示例,请参阅具有依赖项和自定义容器映像的流水线 Flex 模板教程。
使用私有注册表中的映像
您可以构建存储在私有 Docker 注册表中的 Flex 模板映像,条件是该私有注册表使用 HTTPS 并且具有有效证书。
如需使用私有注册表中的映像,请指定该映像的路径以及该注册表的用户名和密码。用户名和密码必须存储在 Secret Manager 中。您可以使用以下格式之一提供密文:
projects/{project}/secrets/{secret}/versions/{secret_version}
projects/{project}/secrets/{secret}
如果您使用第二种格式,因为它未指定版本,则 Dataflow 会使用最新版本。
如果注册表使用自签名证书,您还需要指定自签名证书的 Cloud Storage 路径。
下表介绍了可用于配置私有注册表的 gcloud CLI 选项。
参数 | 说明 |
---|---|
image
|
注册表的地址。例如:gcp.repository.example.com:9082/registry/example/image:latest 。 |
image-repository-username-secret-id
|
用于向私有注册表进行身份验证的用户名的 Secret Manager 密文 ID。例如:projects/example-project/secrets/username-secret 。 |
image-repository-password-secret-id
|
用于向私有注册表进行身份验证的密码的 Secret Manager 密文 ID。例如:projects/example-project/secrets/password-secret/versions/latest 。 |
image-repository-cert-path
|
私有注册表的自签名证书的完整 Cloud Storage 网址。仅当注册表使用自签名证书时,才需要填写此值。例如:gs://example-bucket/self-signed.crt 。 |
下面是一个示例 Google Cloud CLI 命令,该命令使用具有自签名证书的私有注册表中的映像构建 Flex 模板。
gcloud dataflow flex-template build gs://example-bucket/custom-pipeline-private-repo.json --sdk-language=JAVA --image="gcp.repository.example.com:9082/registry/example/image:latest" --image-repository-username-secret-id="projects/example-project/secrets/username-secret" --image-repository-password-secret-id="projects/example-project/secrets/password-secret/versions/latest" --image-repository-cert-path="gs://example-bucket/self-signed.crt" --metadata-file=metadata.json
如需构建您自己的 Flex 模板,您需要替换示例值,并且可能需要指定不同的选项或额外指定一些选项。如需了解详情,请参阅以下资源:
指定流水线选项
如需了解 Flex 模板直接支持的流水线选项,请参阅流水线选项。
您还可以间接使用任何 Apache Beam 流水线选项。如果您为 Flex 模板作业使用 metadata.json
文件,请在该文件中添加这些流水线选项。此元数据文件必须遵循 TemplateMetadata
中的格式。
否则,在启动 Flex 模板作业时,请使用参数字段传递这些流水线选项。
API
使用 parameters
字段添加流水线选项。
gcloud
使用 parameters
标志添加流水线选项。
传递 List
或 Map
类型的参数时,您可能需要在 YAML 文件中定义参数并使用 flags-file
。
如需查看此方法的示例,请参阅此解决方案中的“创建包含参数的文件”步骤。
使用 Flex 模板时,您可以在流水线初始化期间配置部分流水线选项,但其他流水线选项无法更改。 如果 Flex 模板所需的命令行参数被覆盖,作业可能会忽略、替换或舍弃模板启动器传递的流水线选项。此外,作业本身可能会无法启动,或者系统可能会启动未使用 Flex 模板的作业。如需了解详情,请参阅读取作业文件失败。
在流水线初始化期间,请勿更改以下流水线选项:
Java
runner
project
jobName
templateLocation
region
Python
runner
project
job_name
template_location
region
Go
runner
project
job_name
template_location
region
屏蔽使用基于元数据的 SSH 密钥的虚拟机的项目 SSH 密钥
您可以通过阻止来自虚拟机的项目 SSH 密钥,阻止虚拟机接受存储在项目元数据中的 SSH 密钥。 将 additional-experiments
标志与 block_project_ssh_keys
服务选项搭配使用:
--additional-experiments=block_project_ssh_keys
如需了解详情,请参阅 Dataflow 服务选项。
元数据
您可以使用附加元数据扩展模板,以便在运行模板时验证自定义参数。如果要为模板创建元数据,请按照以下步骤操作:
元数据参数
参数键 | 必需 | 值的说明 | |
---|---|---|---|
name |
是 | 模板的名称。 | |
description |
否 | 对模板进行说明的一小段文本。 | |
streaming |
否 | 如果为 true ,则此模板支持流处理。默认值为 false 。 |
|
supportsAtLeastOnce |
否 | 如果为 true ,则此模板支持“至少一次”处理。默认值为 false 。如果模板设计为支持“至少一次”流处理模式,请将此参数设置为 true 。
|
|
supportsExactlyOnce |
否 | 如果为 true ,则此模板支持“正好一次”处理。默认值为 true 。 |
|
defaultStreamingMode |
否 | 默认流处理模式,适用于同时支持“至少一次”模式和“正好一次”模式的模板。请使用以下某个值:"AT_LEAST_ONCE" 、"EXACTLY_ONCE" 。如果未指定,则默认流处理模式为“正好一次”。
|
|
parameters |
否 | 模板使用的一组附加参数。默认情况下,使用空数组。 | |
name |
是 | 模板中使用的参数的名称。 | |
label |
是 | 人类可读的字符串,用于在 Google Cloud 控制台中标记参数。 | |
helpText |
是 | 对参数进行说明的一小段文本。 | |
isOptional |
否 | 如果参数是必需的,则为 false ;如果参数是可选的,则为 true 。除非设置了值,否则 isOptional 默认为 false 。
如果您没有为元数据添加此参数键,则元数据会成为必需参数。 |
|
regexes |
否 | 一组字符串形式的 POSIX-egrep 正则表达式(用于验证参数的值)。例如,["^[a-zA-Z][a-zA-Z0-9]+"] 是一个正则表达式,用于验证以字母开头,并且包含一个或多个字符的值。默认使用空数组。 |
示例元数据文件
Java
{ "name": "Streaming Beam SQL", "description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery", "parameters": [ { "name": "inputSubscription", "label": "Pub/Sub input subscription.", "helpText": "Pub/Sub subscription to read from.", "regexes": [ "[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}" ] }, { "name": "outputTable", "label": "BigQuery output table", "helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.", "isOptional": true, "regexes": [ "[^:]+:[^.]+[.].+" ] } ] }
Python
{ "name": "Streaming beam Python flex template", "description": "Streaming beam example for python flex template.", "parameters": [ { "name": "input_subscription", "label": "Input PubSub subscription.", "helpText": "Name of the input PubSub subscription to consume from.", "regexes": [ "projects/[^/]+/subscriptions/[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}" ] }, { "name": "output_table", "label": "BigQuery output table name.", "helpText": "Name of the BigQuery output table name.", "isOptional": true, "regexes": [ "([^:]+:)?[^.]+[.].+" ] } ] }
您可以从 Dataflow 模板目录下载 Google 提供的模板的元数据文件。
了解暂存位置和临时位置
Google Cloud CLI 会在您运行 Flex 模板时提供 --staging-location
和 --temp-location
选项。类似地,Dataflow REST API 为 FlexTemplateRuntimeEnvironment 提供了 stagingLocation
和 tempLocation
字段。
对于 Flex 模板,暂存位置是在启动模板的暂存步骤中将文件写入的 Cloud Storage 网址。Dataflow 会读取这些暂存文件以创建模板图。临时位置是在执行步骤中将临时文件写入的 Cloud Storage 网址。
更新 Flex 模板作业
以下示例请求展示了如何使用 projects.locations.flexTemplates.launch 方法更新模板流处理作业。如果您想使用 gcloud CLI,请参阅更新现有流水线。
如果要更新经典模板,请改用 projects.locations.templates.launch。
按照步骤通过 Flex 模板创建流处理作业。发送以下 HTTP POST 请求,其中包含修改后的值:
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch { "launchParameter": { "update": true "jobName": "JOB_NAME", "parameters": { "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "output_table": "PROJECT_ID:DATASET.TABLE_NAME" }, "containerSpecGcsPath": "STORAGE_PATH" }, }
- 将
PROJECT_ID
替换为您的项目 ID。 - 将
REGION
替换为您要更新的作业的 Dataflow 区域。 - 将
JOB_NAME
替换为您要更新的作业的确切名称。 - 将
parameters
设置为您的键值对列表。列出的参数特定于此模板示例。如果您使用的是自定义模板,请根据需要修改参数。如果您使用的是示例模板,请替换以下变量。- 将
SUBSCRIPTION_NAME
替换为您的 Pub/Sub 订阅名称。 - 将
DATASET
替换为您的 BigQuery 数据集名称。 - 将
TABLE_NAME
替换为 BigQuery 表名称。
- 将
- 将
STORAGE_PATH
替换为模板文件的 Cloud Storage 位置。该位置应以gs://
开头。
- 将
使用
environment
参数更改环境设置。如需了解详情,请参阅FlexTemplateRuntimeEnvironment
。可选:如需使用 curl(Linux、macOS 或 Cloud Shell)发送请求,将请求保存到 JSON 文件,然后运行以下命令:
curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
将 FILE_PATH 替换为包含请求正文的 JSON 文件的路径。
使用 Dataflow 监控界面验证已创建了具有相同名称的新作业。此作业的状态为已更新。
限制
以下限制适用于 Flex 模板作业:
- 您必须使用 Google 提供的基础映像来使用 Docker 打包容器。 如需查看适用映像的列表,请参阅 Flex 模板基础映像。
- 必须在调用
run
后退出用于构建流水线的程序,流水线才能启动。 - 不支持
waitUntilFinish
(Java) 和wait_until_finish
(Python)。
后续步骤
- 如需详细了解经典模板和 Flex 模板及其用例场景,请参阅 Dataflow 模板。
- 如需了解 Flex 模板问题排查相关的信息,请参阅排查 Flex 模板超时问题。
- 如需查看更多参考架构、图表和最佳实践,请浏览云架构中心。