使用 KubernetesPodOperator

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

本页面介绍了如何使用 KubernetesPodOperator 来部署 Kubernetes Pod 从 Cloud Composer 迁移到 Google Kubernetes Engine 属于您的 Cloud Composer 环境

KubernetesPodOperator 会在您环境的集群中启动 Kubernetes pod。相比之下,Google Kubernetes Engine Operator 在指定集群中运行 Kubernetes pod,该集群可以是与您的环境无关的独立集群。您还可以使用 Google Kubernetes Engine Operator 来创建和删除集群。

如果您需要以下内容,KubernetesPodOperator 是一个不错的选择:

  • 无法通过公共 PyPI 代码库获得的自定义 Python 依赖项。
  • 无法通过现有 Cloud Composer 工作器映像获得的二进制依赖项。

准备工作

  • 如果使用的是 CNCF Kubernetes Provider 版本 5.0.0,请按照“CNCF Kubernetes Provider”部分中记录的说明操作。

  • Cloud Composer 2 不提供 Pod 亲和性配置。如果您想使用 Pod 亲和性,请改用 GKE Operator 在其他集群中启动 Pod。

Cloud Composer 2 中的 KubernetesPodOperator 简介

本部分介绍 KubernetesPodOperator 在 Cloud Composer 2 中的工作原理。

资源使用情况

在 Cloud Composer 2 中,您的环境的集群会自动扩缩。您运行的额外工作负载 使用 KubernetesPodOperator 独立于您的环境进行扩缩。

您的环境不会受到资源需求增加的影响, 您的环境的集群会根据资源情况进行扩缩 需求

您在环境的集群中运行的额外工作负载的价格遵循 Cloud Composer 2 价格模式并使用 Cloud Composer Compute SKU

Cloud Composer 2 使用 Autopilot 集群,其中引入了计算类的概念:

  • Cloud Composer 仅支持 general-purpose 计算类。

  • 默认情况下,如果未选择任何类,则 general-purpose 类 使用 KubernetesPodOperator 创建 Pod。

  • 每个类都与特定的属性和资源限制相关联, 有关详情,请参见 Autopilot 文档。例如,在 general-purpose 类中运行的 Pod 最多可使用 110 GiB 的内存。

对项目资源的访问权限

Cloud Composer 2 使用 GKE 集群, 适用于 GKE 的工作负载身份联合。在 composer-user-workloads 命名空间中运行的 Pod 无需额外配置即可访问项目中的 Google Cloud 资源。您的环境的服务账号 用于访问这些资源。

如果要使用自定义命名空间,那么可以使用 Kubernetes 服务账号 与此命名空间关联的必须映射到 Google Cloud 服务 账号,用于为 Google API 请求启用服务身份授权 和其他服务。如果您在环境 Kubernetes 和集群之间的 IAM 绑定 Google Cloud 服务账号不会被创建,这些 Pod 也无法 您的 Google Cloud 项目的资源。

如果您使用自定义命名空间,并且希望 Pod 有权访问 Google Cloud 资源 遵循适用于 GKE 的工作负载身份联合中的指南 并为自定义命名空间设置绑定:

  1. 在您的环境集群中创建单独的命名空间。
  2. 在自定义命名空间 Kubernetes 服务账号之间建立绑定 和环境的服务账号
  3. 将您的环境的服务账号注释添加到 Kubernetes 服务账号。
  4. 在使用 KubernetesPodOperator 时,请指定命名空间和 namespaceservice_account_name 中的 Kubernetes 服务账号 参数。

最少的配置工作量

如需创建 KubernetesPodOperator,只能使用 Pod 的 nameimagetask_id 参数为必填项。/home/airflow/composer_kube_config 包含用于向 GKE 进行身份验证的凭据。

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`. Always use the
    # `composer-user-workloads` namespace with Composer 3.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

其他配置

此示例展示了您可在 KubernetesPodOperator 即可实现。

如需详细了解参数,请参阅 KubernetesPodOperator 的 Airflow 参考文档。有关使用 Kubernetes Secret 和 ConfigMap,请参阅使用 Kubernetes Secret 和 ConfigMap。如需了解如何将 Jinja 模板与 KubernetesPodOperator 搭配使用,请参阅使用 Jinja 模板

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The Docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container.
    # The env_vars parameter is templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

使用 Jinja 模板

Airflow 支持 DAG 中的 Jinja 模板

您必须声明必需的 Airflow 参数(task_idnameimage)。如下例所示 您可以使用 Jinja 创建所有其他参数的模板,包括 cmdsargumentsenv_varsconfig_file

示例中的 env_vars 参数是通过名为 my_valueAirflow 变量设置的。示例 DAG 从 Airflow 中的 vars 模板变量获取值。Airflow 提供了更多可用于访问不同类型信息的变量。例如,您可以使用 conf 模板变量来访问 Airflow 配置选项的值。如需了解详情以及 Airflow 中可用变量的列表,请参阅 Airflow 文档中的模板参考文档

如果不更改 DAG 或创建 env_vars 变量,示例中的 ex-kube-templates 任务会因变量不存在而失败。在 Airflow 界面中或使用 Google Cloud CLI 创建此变量:

Airflow 界面

  1. 转到 Airflow 界面

  2. 在工具栏中,依次选择 Admin > Variables

  3. List Variable 页面上,点击 Add a new record

  4. Add Variable 页面上,输入以下信息:

    • Key:my_value
    • Val:example_value
  5. 点击保存

gcloud

输入以下命令:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

您需要将其中的:

  • ENVIRONMENT 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。

以下示例演示了如何将 Jinja 模板与 KubernetesPodOperator 搭配使用:

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below can be templated with Jinja. For more information
    # and the list of variables available in Airflow, see
    # the Airflow templates reference:
    # https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in Jinja is the execution date as YYYY-MM-DD, this Docker image
    # will echo the execution date. Arguments to the entrypoint. The Docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI. The env_vars parameter
    # is templated.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Specifies path to Kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

使用 Kubernetes Secret 和 ConfigMap

Kubernetes Secret 是包含敏感数据的对象。Kubernetes ConfigMap 是包含键值对形式的非机密数据的对象。

在 Cloud Composer 2 中,您可以使用 Google Cloud CLI、API 或 Terraform,然后通过 KubernetesPodOperator 即可实现。

YAML 配置文件简介

使用 Google Cloud CLI 和 API 创建 Kubernetes Secret 或 ConfigMap 时,您需要提供 YAML 格式的文件。此文件必须遵循 和 ConfigMap 所使用的格式相同。Kubernetes 文档 提供了许多 ConfigMap 和 Secret 的代码示例。首先,您可以 请参阅 使用 Secret 安全地分发凭据 页面和 ConfigMaps

与 Kubernetes Secret 中相同,在 Secret 中定义值时,请使用 base64 表示法。

要对某个值进行编码,您可以使用以下命令(这也是多种方法之一, 以获取 base64 编码的值):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

输出:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

以下两个 YAML 文件示例将在本指南后面的示例中使用。Kubernetes Secret 的 YAML 配置文件示例:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

另一个演示如何包含文件的示例。与上一个相同 例如,先对文件的内容 (cat ./key.json | base64) 进行编码,然后 在 YAML 文件中提供此值:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

ConfigMap 的 YAML 配置文件示例。您无需使用 base64 ConfigMap 中的表示法:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

管理 Kubernetes Secret

在 Cloud Composer 2 中,您可以使用 Google Cloud CLI 和 kubectl 创建 Secret:

  1. 获取有关环境集群的信息:

    1. 运行以下命令:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      替换:

      • ENVIRONMENT 替换为您的环境名称。
      • LOCATION 替换为 Cloud Composer 环境所在的区域。

      此命令的输出使用以下格式:projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>

    2. 为了获取 GKE 集群 ID,请复制 /clusters/ 后面的输出内容(以 -gke 结尾)。

  2. 通过以下方式连接到您的 GKE 集群: 命令:

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --region LOCATION
    

    替换以下内容:

    • CLUSTER_ID:环境的集群 ID。
    • PROJECT_ID项目 ID
    • LOCATION:环境所在的区域。

  3. 创建 Kubernetes Secret:

    以下命令演示了创建 Kubernetes Secret 的两种不同方法。--from-literal 方法使用键值对。--from-file 方法使用文件内容。

    • 如需通过提供键值对来创建 Kubernetes Secret,请运行 。本示例将创建一个名为 airflow-secrets,其 sql_alchemy_conn 字段的值为 test_value

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
      

    • 如需通过提供文件内容创建 Kubernetes Secret,请运行 。本示例将创建一个名为 包含 service-account.json 字段的 service-account 值取自本地 ./key.json 文件的内容。

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json -n composer-user-workloads
      

在 DAG 中使用 Kubernetes Secret

此示例展示了将 Kubernetes Secret 用作环境的两种方法 变量,并将其装载为 Pod 所装载的卷。

第一个 Secret (airflow-secrets) 已设置 名为 SQL_CONN 的 Kubernetes 环境变量(而不是 Airflow 或 Cloud Composer 环境变量)。

第二个 Secret service-account 将包含服务账号令牌的文件 service-account.json 装载到 /var/secrets/google

Secret 对象如下所示:

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

第一个 Kubernetes Secret 的名称在 secret_env 变量中定义。 此 Secret 的名称为 airflow-secretsdeploy_type 参数指定 必须将其公开为环境变量环境变量的名称为 SQL_CONN,如 deploy_target 参数中所指定。最后, SQL_CONN 环境变量的值已设置为 sql_alchemy_conn 键。

第二个 Kubernetes Secret 的名称在 secret_volume 中定义。 变量。此 Secret 名为 service-account。它以 音量(如 deploy_type 参数中所指定)。要装载的文件的路径 deploy_target/var/secrets/google。最后,存储在 deploy_target 中的 Secret 的 keyservice-account.json

操作节点配置类似于如下所示:

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. The env_vars parameter is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

CNCF Kubernetes 提供商的相关信息

KubernetesPodOperator 是在 apache-airflow-providers-cncf-kubernetes 提供商。

如需查看 CNCF Kubernetes 提供商的详细版本说明,请访问 CNCF Kubernetes 提供商网站

版本 6.0.0

在 CNCF Kubernetes Provider 软件包的 6.0.0 版中,KubernetesPodOperator 中默认使用 kubernetes_default 连接。

如果您在 5.0.0 版中指定了自定义连接,则此自定义连接 仍由相应运算符使用。切换回使用kubernetes_default 连接,您可能需要相应地调整 DAG。

版本 5.0.0

此版本引入了一些向后不兼容的更改 (与版本 4.4.0 相比)最重要的指标 kubernetes_default 连接(未在 5.0.0 版中使用)。

  • 需要修改 kubernetes_default 连接。Kubernetes 配置路径必须设置为 /home/airflow/composer_kube_config(如以下图所示)。作为替代方案,config_file 添加到 KubernetesPodOperator 配置(如 以下代码示例)。
Airflow 界面中的 Kube 配置路径字段
图 1. Airflow 界面,修改了 kubernetes_default 连接(点击可放大)
  • 通过以下方式使用 KubernetesPodOperator 修改任务的代码:
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

有关 5.0.0 版的详细信息,请参阅 CNCF Kubernetes 提供程序版本说明

问题排查

本部分提供有关排查常见 KubernetesPodOperator 问题的建议 问题:

查看日志

排查问题时,您可以按以下顺序检查日志:

  1. Airflow 任务日志:

    1. 在 Google Cloud 控制台中,前往环境页面。

      转到“环境”

    2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

    3. 转到 DAG 标签页。

    4. 点击 DAG 的名称,然后点击 DAG 运行作业以查看详细信息 和日志。

  2. Airflow 调度器日志:

    1. 前往环境详情页面。

    2. 前往日志标签页。

    3. 检查 Airflow 调度程序日志。

  3. Google Cloud 控制台中的 GKE 下的 Pod 日志 工作负载这些日志包括 Pod 定义 YAML 文件、Pod 事件 Pod 详情。

非零返回代码

使用 KubernetesPodOperator(和 GKEStartPodOperator)时, 容器入口点的返回代码决定了任务 是否成功返回非零代码表示失败。

一种常见的模式是执行 Shell 脚本作为容器入口点 将容器中的多个操作组合在一起。

如果您要编写这样的脚本,我们建议您在脚本中加入 set -e 命令,以便脚本中失败的命令 终止脚本并将故障传播到 Airflow 任务实例。

Pod 超时

KubernetesPodOperator 的默认超时时间为 120 秒,这会导致在较大的映像下载完成之前发生超时。您可以 在以下情况下,可通过更改 startup_timeout_seconds 参数来延长超时时间: 您需要创建 KubernetesPodOperator

当 Pod 超时时,可通过 Airflow 界面例如:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

在以下情况下,也会出现 Pod 超时: Cloud Composer 服务账号 缺少必要的 IAM 权限,无法在 。如需验证是否属于这种情况,请使用 GKE 信息中心查看 Pod 级错误,找到特定工作负载的日志;或者使用 Cloud Logging。

未能建立新连接

GKE 集群默认启用自动升级。 如果集群中的某个节点池正在升级,您可能会看到以下错误:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

如需检查您的集群是否正在升级,请在 Google Cloud 控制台中前往 Kubernetes 集群页面,查找您的 环境的集群名称。

后续步骤