使用 KubernetesPodOperator

Cloud Composer 1 | Cloud Composer 2

本页面介绍如何使用 KubernetesPodOperator 将 Cloud Composer 中的 Kubernetes Pod 部署到属于 Cloud Composer 环境的 Google Kubernetes Engine 集群中,并确保您的环境具有适当的资源。

KubernetesPodOperator 会在您环境的集群中启动 Kubernetes pod。相比之下,Google Kubernetes Engine Operator 在指定集群中运行 Kubernetes pod,该集群可以是与您的环境无关的独立集群。您还可以使用 Google Kubernetes Engine Operator 来创建和删除集群。

如果您需要以下内容,KubernetesPodOperator 是一个好的选项:

  • 无法通过公共 PyPI 代码库获得的自定义 Python 依赖项。
  • 无法通过现有 Cloud Composer 工作器映像获得的二进制依赖项。

本页面将引导您完成一个包含以下 KubernetesPodOperator 配置的 Airflow DAG 示例:

准备工作

  • 在 Cloud Composer 2 中,您的环境的集群会自动扩缩。使用 KubernetesPodOperator 运行的额外工作负载会独立于您的环境进行扩缩。您的环境不受资源需求增加的影响,但环境的集群会根据资源需求进行扩缩。您在环境的集群中运行的额外工作负载的价格遵循 Cloud Composer 2 价格模式并使用 Cloud Composer 计算 SKU

  • Cloud Composer 2 集群使用 Workload Identity。默认情况下,在新创建的命名空间或 composer-user-workloads 命名空间中运行的 Pod 无法访问 Google Cloud 资源。使用 Workload Identity 时,必须将与命名空间关联的 Kubernetes 服务帐号映射到 Google Cloud 服务帐号,以便为向 Google API 和其他服务发出的请求启用服务身份授权。

    因此,如果您在 composer-user-workloads 命名空间中或在环境的集群中新建的命名空间中运行 Pod,则不会在 Kubernetes 与 Google Cloud 服务帐号之间创建适当的 IAM 绑定,并且这些 Pod 无法访问您的 Google Cloud 项目的资源。

    如果您希望 Pod 有权访问 Google Cloud 资源,请使用 composer-user-workloads 命名空间或创建自己的命名空间,如进一步所述。

    如需提供对项目资源的访问权限,请遵循 Workload Identity 中的指南并设置绑定:

    1. 在您的环境集群中创建单独的命名空间。
    2. composer-user-workloads/<namespace_name> Kubernetes 服务帐号与您的环境的服务帐号之间建立绑定关系。
    3. 将您的环境的服务账号注解添加到 Kubernetes 服务账号。
    4. 使用 KubernetesPodOperator 时,请在 namespaceservice_account_name 参数中指定命名空间和 Kubernetes 服务账号。
  • 如果使用的是 5.0.0 版的 CNCF Kubernetes Provider,则请按照 CNCF Kubernetes 提供商部分中所述的说明操作。

  • Cloud Composer 2 将 GKE 集群与 Workload Identity 搭配使用。GKE 元数据服务器需要几秒钟的时间才能开始接受新创建的 Pod 上的请求。因此,尝试在 Pod 生命周期的前几秒内使用 Workload Identity 进行身份验证可能会失败。如需详细了解此限制,请点击此处

  • Cloud Composer 2 使用 Autopilot 集群,该集群引入了计算类的概念。默认情况下,如果未选择类,则当您使用 KubernetesPodOperator 创建 Pod 时,系统会假定为 general-purpose 类。

    • 每个类都与特定属性和资源限制相关联,您可以在 Autopilot 文档中了解相关信息。例如,在 general-purpose 类中运行的 Pod 最多可使用 110 GiB 的内存。

KubernetesPodOperator 配置

如需继续参照本示例执行操作,请将整个 kubernetes_pod_operator.py 文件放入您环境的 dags/ 文件夹中,或添加相关的 KubernetesPodOperator 代码到某个 DAG。

以下部分介绍了示例中的每个 KubernetesPodOperator 配置。如需详细了解每个配置变量,请参见 Airflow 参考文档

"""Example DAG using KubernetesPodOperator."""
import datetime

from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
    dag_id="composer_sample_kubernetes_pod",
    schedule_interval=datetime.timedelta(days=1),
    start_date=YESTERDAY,
) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, the config file found at
    # `/home/airflow/composer_kube_config` contains credentials for
    # Cloud Composer's Google Kubernetes Engine cluster that is created
    # upon environment creation.
    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        task_id="pod-ex-minimum",
        # Name of task you want to run, used to generate Pod ID.
        name="pod-ex-minimum",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # The namespace to run within Kubernetes. In Composer 2 environments
        # after December 2022, the default namespace is
        # `composer-user-workloads`.
        namespace="composer-user-workloads",
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_template_ex = KubernetesPodOperator(
        task_id="ex-kube-templates",
        name="ex-kube-templates",
        namespace="composer-user-workloads",
        image="bash",
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=["{{ ds }}"],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_secret_vars_ex = KubernetesPodOperator(
        task_id="ex-kube-secrets",
        name="ex-kube-secrets",
        namespace="composer-user-workloads",
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        cmds=["echo"],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            "EXAMPLE_VAR": "/example/value",
            "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
        },
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_full_pod = KubernetesPodOperator(
        task_id="ex-all-configs",
        name="pi",
        namespace="composer-user-workloads",
        image="perl:5.34.0",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["perl"],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={"pod-label": "label-name"},
        # Timeout to start up the Pod, default is 600.
        startup_timeout_seconds=600,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={"EXAMPLE_VAR": "/example/value"},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy="Always",
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={"key1": "value1"},
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 2.3 and the cncf providers package 5.0.0
        # resources were passed as a dictionary. This change was made in
        # https://github.com/apache/airflow/pull/27197
        # Additionally, "memory" and "cpu" were previously named
        # "limit_memory" and "limit_cpu"
        # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
        container_resources=k8s_models.V1ResourceRequirements(
            requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
            limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        ),
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        # Pod affinity with the KubernetesPodOperator
        # is not supported with Composer 2
        # instead, create a cluster and use the GKEStartPodOperator
        # https://cloud.google.com/composer/docs/using-gke-operator
        affinity={},
    )

最少的配置工作量

如需创建 KubernetesPodOperator,只需 Pod 的 name、运行 Pod 的 namespace、要使用的 imagetask_id

当您将以下代码段放入 DAG 时,配置会使用 /home/airflow/composer_kube_config 中的默认值。您不用修改代码,pod-ex-minimum 任务也会成功。

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

模板配置

Airflow 支持使用 Jinja 生成模板。您必须在 Operator 中声明所需变量(task_idnamenamespaceimage)。如以下示例所示,您可以搭配使用 Jinja 和所有其他参数生成模板,包括cmdsargumentsenv_varsconfig_file

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

在不更改 DAG 或您的环境的情况下,由于出现两个错误,ex-kube-templates 任务失败。日志显示此任务失败,因为相应变量不存在 (my_value)。第二个错误(您可以在修复第一个错误后获得)显示任务失败,因为在 config 中找不到 core/kube_config

如需修复这两个错误,请继续按照概述的步骤执行操作。

如需通过 gcloud 或 Airflow 界面设置 my_value,请执行以下操作:

Airflow 界面

在 Airflow 2 界面中:

  1. 转到 Airflow 界面

  2. 在工具栏中,选择 Admin > Variables

  3. List Variable 页面上,点击 Add a new record

  4. Add Variable 页面上,输入以下信息:

    • Key:my_value
    • Val:example_value
  5. 点击保存

gcloud

对于 Airflow 2,请输入以下命令:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

替换:

  • ENVIRONMENT 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。

如需引用自定义 config_file(Kubernetes 配置文件),请将 kube_config Airflow 配置选项替换为有效的 Kubernetes 配置:

部分
core kube_config /home/airflow/composer_kube_config

等待您的环境更新,这可能需要几分钟。 然后再次运行 ex-kube-templates 任务,并验证 ex-kube-templates 任务是否成功。

Secret 变量配置

Kubernetes Secret 是包含敏感数据的对象。您可以使用 KubernetesPodOperator 将 Secret 传递到 Kubernetes pod 中。Secret 必须在 Kubernetes 中定义,否则 pod 无法启动。

此示例展示了使用 Kubernetes Secret 的两种方法:作为环境变量,以及由 pod 装载的卷。

第一个 Secret airflow-secrets 设置为名为 SQL_CONN 的 Kubernetes 环境变量(而不是 Airflow 或 Cloud Composer 环境变量)。

第二个 Secret service-account 将包含服务账号令牌的文件 service-account.json 装载到 /var/secrets/google

这两个 Secret 如下所示:

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

第一个 Kubernetes Secret 的名称在 secret 变量中定义。此具体 Secret 的名称为 airflow-secrets,它显示为环境变量,如 deploy_type 所指示。它设置到的环境变量 deploy_targetSQL_CONN。最后,存储在 deploy_target 中的 Secret 的 keysql_alchemy_conn

第二个 Kubernetes Secret 的名称在 secret 变量中定义。此具体 Secret 的名称为 service-account,它显示为卷,如 deploy_type 所指示。要装载的文件的路径 deploy_target/var/secrets/google。最后,存储在 deploy_target 中的 Secret 的 keyservice-account.json

运算符配置如下所示:

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

在不对 DAG 或您的环境进行任何更改的情况下,ex-kube-secrets 任务失败。如果您查看日志,则任务会因 Pod took too long to start 错误而失败。出现此错误是因为 Airflow 找不到在配置中指定的 Secret secret_env

gcloud

如需使用 gcloud 设置 Secret,请执行以下操作:

  1. 获取有关 Cloud Composer 环境集群的信息。

    1. 运行以下命令:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      您需要在其中:

      • ENVIRONMENT 替换为您的环境名称。
      • LOCATION 替换为 Cloud Composer 环境所在的区域。

      此命令的输出使用以下格式:projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>

    2. 为了获取 GKE 集群 ID,请复制 /clusters/ 后面的输出内容(以 -gke 结尾)。

  2. 通过运行以下命令连接到您的 GKE 集群:

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --region LOCATION
    

    您需要在其中:

    • CLUSTER_ID 替换为您的 GKE 集群 ID。
    • PROJECT 替换为您的 Google Cloud 项目的 ID。
    • LOCATION 替换为 Cloud Composer 环境所在的区域。

  3. 创建 Kubernetes Secret。

    1. 运行以下命令,创建一个 Kubernetes Secret,该 Secret 将 sql_alchemy_conn 的值设置为 test_value

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
      

    2. 运行以下命令,创建一个 Kubernetes Secret,以将 service-account.json 的值设置为名为 key.json 的服务帐号密钥文件的本地路径:

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json -n composer-user-workloads
      

  4. 设置这些 Secret 后,在 Airflow 界面中再次运行 ex-kube-secrets 任务。

  5. 验证 ex-kube-secrets 任务成功。

完整配置

此示例展示了您可以在 KubernetesPodOperator 中配置的所有变量。您不用修改代码,ex-all-configs 任务也会成功。

如需了解每个变量的详细信息,请参见 Airflow KubernetesPodOperator 参考文档

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

CNCF Kubernetes 提供商的相关信息

GKEStartPodOperator 和 KubernetesPodOperator 在 apache-airflow-providers-cncf-kubernetes 提供程序中实现。

如需查看 CNCF Kubernetes 提供商的失败版本说明,请参阅 CNCF Kubernetes 提供商网站

版本 6.0.0

在 6.0.0 版 CNCF Kubernetes Provider 软件包中,KubernetesPodOperator 中默认使用 kubernetes_default 连接。

如果您在版本 5.0.0 中指定了自定义连接,则运营商仍会使用此自定义连接。如需切换回使用 kubernetes_default 连接,您可能需要相应地调整 DAG。

版本 5.0.0

与版本 4.4.0 相比,此版本引入了一些向后不兼容的更改。您应该了解的最重要的几个功能与版本 5.0.0 中未使用的 kubernetes_default 连接相关。

  • 需要修改 kubernetes_default 连接 - Kube 配置路径需要设置为 /home/airflow/composer_kube_config(见图 1),或者需要将 config_file 添加到 KubernetesPodOperator 配置中(如下所示)。
Airflow 界面中的 Kube 配置路径字段
图 1.Airflow 界面,修改了 kubernetes_default 连接(点击可放大)
  • 使用 KubernetesPodOperator 修改任务的代码,具体方式如下
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

如需详细了解版本 5.0.0,请参阅 CNCF Kubernetes 提供商版本说明

问题排查

Pod 问题排查提示

除了在 Airflow 界面中检查任务日志之外,还要查看以下日志:

  • Airflow 调度程序和工作器的输出:

    1. 在 Google Cloud 控制台中,前往环境页面。

      转到“环境”

    2. 请点击与您的环境对应的 DAG 链接。

    3. 在环境的存储桶中,进入上一层。

    4. 查看 logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE> 文件夹中的日志。

  • Google Cloud 控制台中 GKE 工作负载下的详细 Pod 日志。这些日志包括 pod 定义 YAML 文件、pod 事件和 pod 详情。

同时使用 GKEStartPodOperator 时的非零返回代码

使用 KubernetesPodOperatorGKEStartPodOperator 时,容器入口点的返回代码确定任务是否成功。返回非零代码表示失败。

使用 KubernetesPodOperatorGKEStartPodOperator 时的常见模式是执行一个 shell 脚本作为容器入口点,以将容器内的多个操作分组到一起。

如果您要编写这样的脚本,建议您在脚本的顶部加入 set -e 命令,因而脚本中失败的命令会终止脚本并将故障传播至 Airflow 任务实例。

Pod 超时

KubernetesPodOperator 的默认超时时间为 120 秒,这会导致在较大的映像下载完成之前发生超时。您可以通过在创建 KubernetesPodOperator 时更改 startup_timeout_seconds 参数来增加超时值。

如果某个 pod 超时,您可从 Airflow 界面中查看相应任务专属的日志。例如:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

如果 Cloud Composer 服务账号缺少必要的 IAM 权限来执行手头的任务,也会出现 Pod 超时。如需验证是否属于这种情况,请使用 GKE 信息中心查看 Pod 级错误,找到特定工作负载的日志;或者使用 Cloud Logging。

未能建立新连接

GKE 集群默认启用自动升级。 如果集群中的某个节点池正在升级,您可能会看到以下错误:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

如需检查您的集群是否正在升级,请在 Google Cloud 控制台中转到 Kubernetes 集群页面,然后查找环境的集群名称旁边的加载图标。

后续步骤