配置 Flex 模板

本页面记录了各种 Dataflow Flex 模板配置选项,包括:

如要配置示例 Flex 模板,请参阅 Flex 模板教程

了解 Flex 模板权限

使用 Flex 模板时,您需要三组权限:

  • 创建资源的权限
  • 构建 Flex 模板的权限
  • 运行 Flex 模板的权限

创建资源的权限

如需开发和运行 Flex 模板流水线,您需要创建各种资源(例如暂存存储桶)。对于一次性资源创建任务,您可以使用基本 Owner 角色

构建 Flex 模板的权限

作为 Flex 模板的开发者,您需要构建模板,以将其提供给用户。构建涉及将模板规范上传到 Cloud Storage 存储桶,以及使用运行流水线所需的代码和依赖项预配 Docker 映像。如需构建 Flex 模板,您需要拥有 Cloud Storage 的读写权限以及对 Artifact Registry 代码库的 Artifact Registry Writer 权限。您可以通过分配以下角色来授予这些权限:

  • Storage Admin (roles/storage.admin)
  • Cloud Build Editor (roles/cloudbuild.builds.editor)
  • Artifact Registry Writer (roles/artifactregistry.writer)

运行 Flex 模板的权限

当您运行 Flex 模板时,Dataflow 会为您创建一个作业。如需创建作业,Dataflow 服务账号需要以下权限:

  • dataflow.serviceAgent

首次使用 Dataflow 时,服务会为您分配此角色,因此您无需授予此权限。

默认情况下,Compute Engine 服务账号用于启动器虚拟机和工作器虚拟机。该服务账号需要以下角色和权限:

  • Storage Object Admin (roles/storage.objectAdmin)
  • Viewer (roles/viewer)
  • Dataflow Worker (roles/dataflow.worker)
  • 暂存存储桶的读写权限
  • Flex 模板映像的读取权限

如需授予暂存存储桶的读写权限,您可以使用 Storage Object Admin (roles/storage.objectAdmin) 角色。如需了解详情,请参阅 Cloud Storage 的 IAM 角色

如需授予 Flex 模板映像的读取权限,您可以使用 Storage Object Viewer (roles/storage.objectViewer) 角色。如需了解详情,请参阅配置访问权限控制

设置所需的 Dockerfile 环境变量

如果要为 Flex 模板作业创建自己的 Dockerfile,请指定以下环境变量:

Java

在 Dockerfile 中指定 FLEX_TEMPLATE_JAVA_MAIN_CLASSFLEX_TEMPLATE_JAVA_CLASSPATH

ENV 说明 必需
FLEX_TEMPLATE_JAVA_MAIN_CLASS 指定启动 Flex 模板要运行的 Java 类。
FLEX_TEMPLATE_JAVA_CLASSPATH 指定类文件的位置。
FLEX_TEMPLATE_JAVA_OPTIONS 指定启动 Flex 模板时要传递的 Java 选项。

Python

在 Dockerfile 中指定 FLEX_TEMPLATE_PYTHON_PY_FILE

如需管理流水线依赖项,请在 Dockerfile 中设置变量,如下所示:

  • FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
  • FLEX_TEMPLATE_PYTHON_PY_OPTIONS
  • FLEX_TEMPLATE_PYTHON_SETUP_FILE
  • FLEX_TEMPLATE_PYTHON_EXTRA_PACKAGES

例如,GitHub 中的 Streaming in Python Flex 模板教程中设置了以下环境变量:

ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
ENV 说明 必需
FLEX_TEMPLATE_PYTHON_PY_FILE 指定启动 Flex 模板要运行的 Python 文件。
FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE 指定包含流水线依赖项的需求文件。 如需了解详情,请参阅 Apache Beam 文档中的 PyPI 依赖项
FLEX_TEMPLATE_PYTHON_SETUP_FILE 指定流水线软件包“setup.py”文件的路径。 如需了解详情,请参阅 Apache Beam 文档中的多个文件依赖项
FLEX_TEMPLATE_PYTHON_EXTRA_PACKAGES

指定不公开提供的软件包。如需了解如何使用额外的软件包,请参阅本地或非 PyPI 依赖项

FLEX_TEMPLATE_PYTHON_PY_OPTIONS 指定启动 Flex 模板时要传递的 Python 选项。

软件包依赖项

如果 Dataflow Python 流水线使用其他依赖项,您可能需要配置 Flex 模板,以便在 Dataflow 工作器虚拟机上安装其他依赖项。

如果在限制互联网访问的环境中运行使用 Flex 模板的 Python Dataflow 作业,则您必须在创建模板时预封装依赖项。

使用以下任一选项预封装 Python 依赖项。

如需了解如何在 Java 和 Go 流水线中管理流水线依赖项,请参阅在 Dataflow 中管理流水线依赖项

使用需求文件并使用模板预封装依赖项

如果您使用自己的 Dockerfile 来定义 Flex 模板映像,请按以下步骤操作:

  1. 创建一个列出流水线依赖项的 requirements.txt 文件。

    COPY requirements.txt /template/
    ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/template/requirements.txt"
    
  2. 在 Flex 模板映像中安装依赖项。

    RUN pip install --no-cache-dir -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
    
  3. 将依赖项下载到本地要求缓存中,该缓存会在模板启动时暂存到 Dataflow 工作器。

    RUN pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
    

使用此方法时,requirements.txt 文件中的依赖项会在运行时安装到 Dataflow 工作器。Google Cloud 控制台的“建议”标签页中的分析可能会记录此行为。如需避免在运行时安装依赖项,请使用自定义容器映像

以下是使用 Flex 模板中的要求文件的代码示例。

# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

FROM gcr.io/dataflow-templates-base/python3-template-launcher-base

# Configure the Template to launch the pipeline with a --requirements_file option.
# See: https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#pypi-dependencies
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/template/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/streaming_beam.py"

COPY . /template

RUN apt-get update \
    # Install any apt packages if required by your template pipeline.
    && apt-get install -y libffi-dev git \
    && rm -rf /var/lib/apt/lists/* \
    # Upgrade pip and install the requirements.
    && pip install --no-cache-dir --upgrade pip \
    # Install dependencies from requirements file in the launch environment.
    && pip install --no-cache-dir -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE \
    # When FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE  option is used,
    # then during Template launch Beam downloads dependencies
    # into a local requirements cache folder and stages the cache to workers.
    # To speed up Flex Template launch, pre-download the requirements cache
    # when creating the Template.
    && pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE

# Set this if using Beam 2.37.0 or earlier SDK to speed up job submission.
ENV PIP_NO_DEPS=True

ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]

将流水线结构设计为软件包并使用本地软件包

如果您使用多个 Python 本地文件或模块,请将流水线结构化为软件包。文件结构可能如下所示:

main.py
pyproject.toml
setup.py
src/
  my_package/
    my_custom_dofns_and_transforms.py
    my_pipeline_launcher.py
    other_utils_and_helpers.py
  1. 将顶级入口点(例如 main.py 文件)放置在根目录中。将其余文件放在 src 目录中的单独文件夹中,例如 my_package

  2. 将软件包配置文件添加到根目录,并提供软件包详情和要求。

    pyproject.toml

    [project]
    name = "my_package"
    version = "package_version"
    dependencies = [
      # Add list of packages (and versions) that my_package depends on.
      # Example:
      "apache-beam[gcp]==2.54.0",
    ]
    

    setup.py

      """An optional setuptools configuration stub for the pipeline package.
    
      Use pyproject.toml to define the package. Add this file only if you must
      use the --setup_file pipeline option or the
      FLEX_TEMPLATE_PYTHON_SETUP_FILE configuration option.
      """
    
      import setuptools
      setuptools.setup()
    

    如需详细了解如何配置本地软件包,请参阅打包 Python 项目

  3. 为流水线导入本地模块或文件时,请使用 my_package 软件包名称作为导入路径。

    from my_package import word_count_transform
    
  4. 在 Flex 模板映像中安装流水线软件包。您的 Flex 模板 Dockerfile 可能包含类似于以下示例的内容:

    Dockerfile

    ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py"
    ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
    
    # Copy pipeline, packages and requirements.
    WORKDIR ${WORKDIR}
    COPY main.py .
    COPY pyproject.toml .
    COPY setup.py .
    COPY src src
    
    # Install local package.
    RUN pip install -e .
    

使用此方法时,requirements.txt 文件中的依赖项会在运行时安装到 Dataflow 工作器。Google Cloud 控制台的“建议”标签页中的分析可能会记录此行为。如需避免在运行时安装依赖项,请使用自定义容器映像

如需查看遵循推荐方法的示例,请参阅 GitHub 中的具有依赖项和自定义容器映像的流水线 Flex 模板教程。

使用预安装所有依赖项的自定义容器

如需避免在运行时安装依赖项,请使用自定义容器。对于在不访问互联网的环境中运行的流水线,首选此选项。

如需使用自定义容器,请按以下步骤操作:

  1. 构建一个预安装必要依赖项的自定义容器映像。

  2. 在 Flex 模板 Dockerfile 中预安装相同的依赖项。

    如需防止在运行时安装依赖项,请勿在 Flex 模板配置中使用 FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILEFLEX_TEMPLATE_PYTHON_SETUP_FILE 选项。

    修改后的 Flex 模板 Dockerfile 可能如以下示例所示:

    FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/main.py"
    COPY . /template
    # If you use a requirements file, pre-install the requirements.txt.
    RUN pip install --no-cache-dir -r /template/requirements.txt
    # If you supply the pipeline in a package, pre-install the local package and its dependencies.
    RUN pip install -e /template
    

    使用此方法时,您需要执行以下操作:

    • 构建 Flex 模板映像
    • 构建自定义 SDK 容器映像
    • 在两个映像中安装相同的依赖项

    或者,为了减少需要维护的映像数量,请将自定义容器映像用作 Flex 模板的基础映像

  3. 如果您使用 Apache Beam SDK 2.49.0 版或更低版本,请在流水线启动器中添加 --sdk_location=container 流水线选项。此选项会指示流水线使用自定义容器中的 SDK,而不是下载 SDK。

    options = PipelineOptions(beam_args, save_main_session=True, streaming=True, sdk_location="container")
    
  4. flex-template run 命令中设置 sdk_container_image 参数。例如:

    gcloud dataflow flex-template run $JOB_NAME \
       --region=$REGION \
       --template-file-gcs-location=$TEMPLATE_PATH \
       --parameters=sdk_container_image=$CUSTOM_CONTAINER_IMAGE \
       --additional-experiments=use_runner_v2
    

    如需了解详情,请参阅在 Dataflow 中使用自定义容器

选择基础映像

您可以使用 Google 提供的基础映像来使用 Docker 打包模板容器映像。从 Flex 模板基础映像中选择最新标记。建议使用特定的图片代码(而非 latest)。

请按以下格式指定基础映像:

gcr.io/dataflow-templates-base/IMAGE_NAME:TAG

请替换以下内容:

使用自定义容器映像

如果流水线使用自定义容器映像,我们建议将此自定义映像用作 Flex 模板 Docker 映像的基础映像。为此,请将 Google 提供的模板基础映像中的 Flex 模板启动器二进制文件复制到您的自定义映像。

映像的示例 Dockerfile 既可用作自定义 SDK 容器映像,也可用作 Flex 模板,如下所示:

FROM gcr.io/dataflow-templates-base/IMAGE_NAME:TAG as template_launcher
FROM apache/beam_python3.10_sdk:2.59.0

# RUN <...Make image customizations here...>
# See: https://cloud.google.com/dataflow/docs/guides/build-container-image

# Configure the Flex Template here.
COPY --from=template_launcher /opt/google/dataflow/python_template_launcher /opt/google/dataflow/python_template_launcher
COPY my_pipeline.py /template/
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/my_pipeline.py"

替换以下内容:

  • IMAGE_NAME:Google 提供的基础映像。例如:python311-template-launcher-base
  • TAG:基础映像的版本标记,请参阅 Flex 模板基础映像参考文档为了提高稳定性和问题排查,请避免使用 latest。请改为固定到特定版本标记。

如需查看遵循此方法的示例,请参阅具有依赖项和自定义容器映像的流水线 Flex 模板教程。

使用私有注册表中的映像

您可以构建存储在私有 Docker 注册表中的 Flex 模板映像,条件是该私有注册表使用 HTTPS 并且具有有效证书。

如需使用私有注册表中的映像,请指定该映像的路径以及该注册表的用户名和密码。用户名和密码必须存储在 Secret Manager 中。您可以使用以下格式之一提供密文:

  • projects/{project}/secrets/{secret}/versions/{secret_version}
  • projects/{project}/secrets/{secret}

如果您使用第二种格式,因为它未指定版本,则 Dataflow 会使用最新版本。

如果注册表使用自签名证书,您还需要指定自签名证书的 Cloud Storage 路径。

下表介绍了可用于配置私有注册表的 gcloud CLI 选项。

参数 说明
image 注册表的地址。例如:gcp.repository.example.com:9082/registry/example/image:latest
image-repository-username-secret-id 用于向私有注册表进行身份验证的用户名的 Secret Manager 密文 ID。例如:projects/example-project/secrets/username-secret
image-repository-password-secret-id 用于向私有注册表进行身份验证的密码的 Secret Manager 密文 ID。例如:projects/example-project/secrets/password-secret/versions/latest
image-repository-cert-path 私有注册表的自签名证书的完整 Cloud Storage 网址。仅当注册表使用自签名证书时,才需要填写此值。例如:gs://example-bucket/self-signed.crt

下面是一个示例 Google Cloud CLI 命令,该命令使用具有自签名证书的私有注册表中的映像构建 Flex 模板。

gcloud dataflow flex-template build gs://example-bucket/custom-pipeline-private-repo.json
--sdk-language=JAVA
--image="gcp.repository.example.com:9082/registry/example/image:latest"
--image-repository-username-secret-id="projects/example-project/secrets/username-secret"
--image-repository-password-secret-id="projects/example-project/secrets/password-secret/versions/latest"
--image-repository-cert-path="gs://example-bucket/self-signed.crt"
--metadata-file=metadata.json

如需构建您自己的 Flex 模板,您需要替换示例值,并且可能需要指定不同的选项或额外指定一些选项。如需了解详情,请参阅以下资源:

指定流水线选项

如需了解 Flex 模板直接支持的流水线选项,请参阅流水线选项

您还可以间接使用任何 Apache Beam 流水线选项。如果您为 Flex 模板作业使用 metadata.json 文件,请在该文件中添加这些流水线选项。此元数据文件必须遵循 TemplateMetadata 中的格式。

否则,在启动 Flex 模板作业时,请使用参数字段传递这些流水线选项。

API

使用 parameters 字段添加流水线选项。

gcloud

使用 parameters 标志添加流水线选项。

传递 ListMap 类型的参数时,您可能需要在 YAML 文件中定义参数并使用 flags-file。 如需查看此方法的示例,请参阅此解决方案中的“创建包含参数的文件”步骤

使用 Flex 模板时,您可以在流水线初始化期间配置部分流水线选项,但其他流水线选项无法更改。 如果 Flex 模板所需的命令行参数被覆盖,作业可能会忽略、替换或舍弃模板启动器传递的流水线选项。此外,作业本身可能会无法启动,或者系统可能会启动未使用 Flex 模板的作业。如需了解详情,请参阅读取作业文件失败

在流水线初始化期间,请勿更改以下流水线选项

Java

  • runner
  • project
  • jobName
  • templateLocation
  • region

Python

  • runner
  • project
  • job_name
  • template_location
  • region

Go

  • runner
  • project
  • job_name
  • template_location
  • region

屏蔽使用基于元数据的 SSH 密钥的虚拟机的项目 SSH 密钥

您可以通过阻止来自虚拟机的项目 SSH 密钥,阻止虚拟机接受存储在项目元数据中的 SSH 密钥。 将 additional-experiments 标志与 block_project_ssh_keys 服务选项搭配使用:

--additional-experiments=block_project_ssh_keys

如需了解详情,请参阅 Dataflow 服务选项

元数据

您可以使用附加元数据扩展模板,以便在运行模板时验证自定义参数。如果要为模板创建元数据,请按照以下步骤操作:

  1. 使用元数据参数中的参数创建 metadata.json 文件。

    如需查看示例,请参阅示例元数据文件

  2. 将元数据文件存储在 Cloud Storage 中模板所在的文件夹内。

元数据参数

参数键 必需 值的说明
name 模板的名称。
description 对模板进行说明的一小段文本。
streaming 如果为 true,则此模板支持流处理。默认值为 false
supportsAtLeastOnce 如果为 true,则此模板支持“至少一次”处理。默认值为 false。如果模板设计为支持“至少一次”流处理模式,请将此参数设置为 true
supportsExactlyOnce 如果为 true,则此模板支持“正好一次”处理。默认值为 true
defaultStreamingMode 默认流处理模式,适用于同时支持“至少一次”模式和“正好一次”模式的模板。请使用以下某个值:"AT_LEAST_ONCE""EXACTLY_ONCE"。如果未指定,则默认流处理模式为“正好一次”。
parameters 模板使用的一组附加参数。默认情况下,使用空数组。
name 模板中使用的参数的名称。
label 人类可读的字符串,用于在 Google Cloud 控制台中标记参数。
helpText 对参数进行说明的一小段文本。
isOptional 如果参数是必需的,则为 false;如果参数是可选的,则为 true。除非设置了值,否则 isOptional 默认为 false。 如果您没有为元数据添加此参数键,则元数据会成为必需参数。
regexes 一组字符串形式的 POSIX-egrep 正则表达式(用于验证参数的值)。例如,["^[a-zA-Z][a-zA-Z0-9]+"] 是一个正则表达式,用于验证以字母开头,并且包含一个或多个字符的值。默认使用空数组。

示例元数据文件

Java

{
  "name": "Streaming Beam SQL",
  "description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
  "parameters": [
    {
      "name": "inputSubscription",
      "label": "Pub/Sub input subscription.",
      "helpText": "Pub/Sub subscription to read from.",
      "regexes": [
        "[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "outputTable",
      "label": "BigQuery output table",
      "helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
      "isOptional": true,
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

Python

{
  "name": "Streaming beam Python flex template",
  "description": "Streaming beam example for python flex template.",
  "parameters": [
    {
      "name": "input_subscription",
      "label": "Input PubSub subscription.",
      "helpText": "Name of the input PubSub subscription to consume from.",
      "regexes": [
        "projects/[^/]+/subscriptions/[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "output_table",
      "label": "BigQuery output table name.",
      "helpText": "Name of the BigQuery output table name.",
      "isOptional": true,
      "regexes": [
        "([^:]+:)?[^.]+[.].+"
      ]
    }
  ]
}

您可以从 Dataflow 模板目录下载 Google 提供的模板的元数据文件。

了解暂存位置和临时位置

Google Cloud CLI 会在您运行 Flex 模板时提供 --staging-location--temp-location 选项。类似地,Dataflow REST API 为 FlexTemplateRuntimeEnvironment 提供了 stagingLocationtempLocation 字段。

对于 Flex 模板,暂存位置是在启动模板的暂存步骤中将文件写入的 Cloud Storage 网址。Dataflow 会读取这些暂存文件以创建模板图。临时位置是在执行步骤中将临时文件写入的 Cloud Storage 网址。

更新 Flex 模板作业

以下示例请求展示了如何使用 projects.locations.flexTemplates.launch 方法更新模板流处理作业。如果您想使用 gcloud CLI,请参阅更新现有流水线

如果要更新经典模板,请改用 projects.locations.templates.launch

  1. 按照步骤通过 Flex 模板创建流处理作业。发送以下 HTTP POST 请求,其中包含修改后的值:

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    {
        "launchParameter": {
          "update": true
          "jobName": "JOB_NAME",
          "parameters": {
            "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
            "output_table": "PROJECT_ID:DATASET.TABLE_NAME"
          },
        "containerSpecGcsPath": "STORAGE_PATH"
        },
    }
    
    • PROJECT_ID 替换为您的项目 ID。
    • REGION 替换为您要更新的作业的 Dataflow 区域
    • JOB_NAME 替换为您要更新的作业的确切名称。
    • parameters 设置为您的键值对列表。列出的参数特定于此模板示例。如果您使用的是自定义模板,请根据需要修改参数。如果您使用的是示例模板,请替换以下变量。
      • SUBSCRIPTION_NAME 替换为您的 Pub/Sub 订阅名称。
      • DATASET 替换为您的 BigQuery 数据集名称。
      • TABLE_NAME 替换为 BigQuery 表名称。
    • STORAGE_PATH 替换为模板文件的 Cloud Storage 位置。该位置应以 gs:// 开头。
  2. 使用 environment 参数更改环境设置。如需了解详情,请参阅 FlexTemplateRuntimeEnvironment

  3. 可选:如需使用 curl(Linux、macOS 或 Cloud Shell)发送请求,将请求保存到 JSON 文件,然后运行以下命令:

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    

    FILE_PATH 替换为包含请求正文的 JSON 文件的路径。

  4. 使用 Dataflow 监控界面验证已创建了具有相同名称的新作业。此作业的状态为已更新

限制

以下限制适用于 Flex 模板作业:

  • 您必须使用 Google 提供的基础映像来使用 Docker 打包容器。 如需查看适用映像的列表,请参阅 Flex 模板基础映像
  • 必须在调用 run 后退出用于构建流水线的程序,流水线才能启动。
  • 不支持 waitUntilFinish (Java) 和 wait_until_finish (Python)。

后续步骤