使用 KubernetesPodOperator

本页介绍如何使用 KubernetesPodOperatorKubernetes pod 从 Cloud Composer 启动到作为 Cloud Composer 环境一部分的 Google Kubernetes Engine 集群,并确保您的环境具备适当的资源。

租户项目和带有箭头的客户项目中的 Cloud Composer 环境资源,显示启动的 Pod 将与 Airflow 工作器、Redis、Airflow 调度程序和 Cloud SQL 代理位于同一 Kubernetes Engine 中
Cloud Composer Kubernetes pod 启动位置(点击可放大)

如果您需要以下内容,KubernetesPodOperator 是一个好的选项:

  • 无法通过公共 PyPI 代码库获得的自定义 Python 依赖项。
  • 无法通过现有 Cloud Composer 工作器映像获得的二进制依赖项。

本页将向您介绍包含以下 KubernetesPodOperator 配置的示例 DAG:

准备工作

  • 我们建议使用最新版本的 Cloud Composer。作为 Cloud Composer 弃用和支持政策的一部分,应至少支持所使用的版本。如需查看映像版本,请查看环境详细信息
  • 请确保您的环境中有足够的资源。 将 pod 启动到资源耗尽的环境中会导致 Airflow 工作器和 Airflow 调度程序错误。

确保您的环境有合适的资源

创建 Cloud Composer 环境时,您可以为环境指定计算能力,并将一定量资源分配给 GKE 集群。将 Kubernetes pod 启动到环境可能会导致程序之间争用资源,例如 CPU 或内存。由于 Airflow 调度程序和工作器属于同一 GKE 集群,因此如果资源争用导致资源耗尽,则调度程序和工作器将无法正常工作。

为了防止资源耗尽,请采取以下一项或多项措施:

创建节点池

防止 Cloud Composer 环境中的资源耗尽的首选方法是新建节点池并仅使用该池中的资源来配置 Kubernetes pod

如需在现有集群中创建节点池,请执行以下步骤:

控制台

  1. 在 Cloud Console 中,转到 GKE 菜单。

    访问 GKE 菜单

  2. 选择所需的集群。

  3. 点击修改

  4. 节点池中,点击添加节点池

  5. 配置节点池。

  6. (可选)启用高级选项,例如自动升级和自动扩缩。

  7. 点击保存

gcloud

输入以下命令:

gcloud container node-pools create POOL_NAME \
    --cluster CLUSTER_NAME \
    --project PROJECT_ID \
    --zone ZONE \
    ADDITIONAL_FLAGS 

其中:

  • POOL_NAME 是所需的节点池名称。
  • CLUSTER 是要在其中创建节点池的集群的名称。
  • PROJECT_ID 是 Cloud Composer 项目名称。
  • ZONE 是 GKE 集群所在的地区。

    如需查看选项列表,请参阅 gcloud container node-pools create 文档。

    成功的 node-pools create 请求会返回节点池信息:

    Creating node pool example-pool...done.
    Created [https://container.googleapis.com/v1/projects/kubernetes-engine-docs/zones/us-central1-f/clusters/example-cluster/nodePools/example-pool].
    NAME          MACHINE_TYPE   DISK_SIZE_GB  NODE_VERSION
    example-pool  n1-standard-1  100           1.2.4

增加环境中的节点数量

增加 Cloud Composer 环境中的节点数将提高工作器可用的计算能力,但如果任务需要的 CPU 或 RAM 高于指定机器类型所能提供的,这种增加不会为其提供额外的资源。

如需增加节点数,请更新您的环境

指定适当的机器类型

在 Cloud Composer 环境创建期间,您可以指定机器类型。为确保有可用的资源,请针对 Cloud Composer 环境中的计算类型指定理想的机器类型

KubernetesPodOperator 配置

如需继续参照本示例执行操作,请将整个 kubernetes_pod_operator.py 文件放入您环境的 dags/文件夹中,或将相关 KubernetesPodOperator 代码添加到某个 DAG

以下部分介绍了示例中的每个 KubernetesPodOperator 配置。如需详细了解每个配置变量,请参见 Airflow 参考文档

import datetime

from airflow import models
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')
secret_volume = secret.Secret(
    'volume',
    # Path where we mount the secret as volume
    '/var/secrets/google',
    # Name of Kubernetes Secret
    'service-account',
    # Key in the form of service account file name
    'service-account.json')

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
        dag_id='composer_sample_kubernetes_pod',
        schedule_interval=datetime.timedelta(days=1),
        start_date=YESTERDAY) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, currently the operator defaults
    # to using the config file found at `/home/airflow/composer_kube_config if
    # no `config_file` parameter is specified. By default it will contain the
    # credentials for Cloud Composer's Google Kubernetes Engine cluster that is
    # created upon environment creation.

    kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
        # The ID specified for the task.
        task_id='pod-ex-minimum',
        # Name of task you want to run, used to generate Pod ID.
        name='pod-ex-minimum',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # The namespace to run within Kubernetes, default namespace is
        # `default`. There is the potential for the resource starvation of
        # Airflow workers and scheduler within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources.
        namespace='default',
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image='gcr.io/gcp-runtimes/ubuntu_18_0_4')
    kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-templates',
        name='ex-kube-templates',
        namespace='default',
        image='bash',
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/code.html#default-variables

        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=['{{ ds }}'],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}")
    kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-secrets',
        name='ex-kube-secrets',
        namespace='default',
        image='ubuntu',
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            'EXAMPLE_VAR': '/example/value',
            'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json'})
    kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-pod-affinity',
        name='ex-pod-affinity',
        namespace='default',
        image='perl',
        cmds=['perl'],
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # affinity allows you to constrain which nodes your pod is eligible to
        # be scheduled on, based on labels on the node. In this case, if the
        # label 'cloud.google.com/gke-nodepool' with value
        # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
        # nodes, it will fail to schedule.
        affinity={
            'nodeAffinity': {
                # requiredDuringSchedulingIgnoredDuringExecution means in order
                # for a pod to be scheduled on a node, the node must have the
                # specified labels. However, if labels on a node change at
                # runtime such that the affinity rules on a pod are no longer
                # met, the pod will still continue to run on the node.
                'requiredDuringSchedulingIgnoredDuringExecution': {
                    'nodeSelectorTerms': [{
                        'matchExpressions': [{
                            # When nodepools are created in Google Kubernetes
                            # Engine, the nodes inside of that nodepool are
                            # automatically assigned the label
                            # 'cloud.google.com/gke-nodepool' with the value of
                            # the nodepool's name.
                            'key': 'cloud.google.com/gke-nodepool',
                            'operator': 'In',
                            # The label key's value that pods can be scheduled
                            # on.
                            'values': [
                                'pool-0',
                                'pool-1',
                            ]
                        }]
                    }]
                }
            }
        })
    kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-all-configs',
        name='pi',
        namespace='default',
        image='perl',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['perl'],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={'pod-label': 'label-name'},
        # Timeout to start up the Pod, default is 120.
        startup_timeout_seconds=120,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={'EXAMPLE_VAR': '/example/value'},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy='Always',
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={'key1': 'value1'},
        # Resource specifications for Pod, this will allow you to set both cpu
        # and memory limits and requirements.
        # Prior to Airflow 1.10.4, resource specifications were
        # passed as a Pod Resources Class object,
        # If using this example on a version of Airflow prior to 1.10.4,
        # import the "pod" package from airflow.contrib.kubernetes and use
        # resources = pod.Resources() instead passing a dict
        # For more info see:
        # https://github.com/apache/airflow/pull/4551
        resources={'limit_memory': 1, 'limit_cpu': 1},
        # Specifies path to kubernetes config. If no config is specified will
        # default to '~/.kube/config'. The config_file is templated.
        config_file='/home/airflow/composer_kube_config',
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        affinity={})

最低配置

为了创建 KubernetesPodOperator,只有 namenamespaceimagetask_id 是必需的。

当您将以下代码段放入 DAG 时,配置会使用 /home/airflow/composer_kube_config 中的默认值。您不用修改代码,pod-ex-minimum 任务也会成功

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id='pod-ex-minimum',
    # Name of task you want to run, used to generate Pod ID.
    name='pod-ex-minimum',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace='default',
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image='gcr.io/gcp-runtimes/ubuntu_18_0_4')

模板配置

Airflow 支持使用 Jinja 生成模板。 您必须在操作节点 (operator) 中声明所需的变量(task_idnamenamespaceimage)。如以下示例所示,您可以搭配使用 Jinja 和所有其他参数生成模板,包括cmdsargumentsenv_varsconfig_file

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-kube-templates',
    name='ex-kube-templates',
    namespace='default',
    image='bash',
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/code.html#default-variables

    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=['{{ ds }}'],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}")

在不更改 DAG 或您的环境的情况下,由于出现两个错误,ex-kube-templates 任务失败。我们来看看如何调试此任务并解决错误。

  1. 验证 ex-kube-templates 任务失败
  2. 检查 ex-kube-templates 任务日志

    日志显示此任务失败,原因是相应变量不存在 (my_value)。

  3. 如需通过 gcloud 或 Airflow 界面设置 my_value,请执行以下操作:

    gcloud

    输入以下命令:

    gcloud composer environments run ENVIRONMENT \
        --location LOCATION \
        variables -- \
        --set my_value example_value 

    其中:

    • ENVIRONMENT 是 Cloud Composer 环境的名称
    • LOCATION 是 Cloud Composer 环境所在的区域

    Airflow 界面

    1. 在工具栏中,点击 Admin > Variables
    2. 点击 Create
    3. 输入以下信息:
      • Key:my_value
      • Val:example_value
    4. 点击保存
  4. 再次运行 ex-kube-templates 任务

  5. 验证 ex-kube-templates 任务状态

    ex-kube-templates 任务仍然失败!查看日志可发现,这次任务失败是因为在 config 中找不到 core/kubeconfig。为了引用自定义 config_file(Kubernetes 配置文件),您需要将 airflow.cfg 文件中的 kube_config 变量设置为有效的 Kubernetes 配置。

  6. 如需设置 kube_config 变量,请输入以下命令:

    gcloud composer environments update ENVIRONMENT \
        --location LOCATION \
        --update-airflow-configs=core-kube_config=/home/airflow/composer_kube_config

    其中:

    • ENVIRONMENT 是 Cloud Composer 环境的名称
    • LOCATION 是 Cloud Composer 环境所在的区域
  7. 等待您的环境更新,这可能需要几分钟。

  8. 再次运行 ex-kube-templates 任务

  9. 验证 ex-kube-templates 任务成功

Secret 变量配置

Kubernetes Secret 是包含少量敏感数据的对象。您可以使用 KubernetesPodOperator 将 Secret 传递到 Kubernetes pod 中。 Secret 必须在 Kubernetes 中定义,否则 pod 无法启动。

在此示例中,我们将 Kubernetes Secret airflow-secrets 部署到名为 SQL_CONN 的 Kubernetes 环境变量(而不是 Airflow 或 Cloud Composer 环境变量)。

Secret 类似于如下所示:

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')
secret_volume = secret.Secret(
    'volume',
    # Path where we mount the secret as volume
    '/var/secrets/google',
    # Name of Kubernetes Secret
    'service-account',
    # Key in the form of service account file name
    'service-account.json')

Kubernetes Secret 的名称在 secret 变量中定义。 此具体 Secret 的名称为 airflow-secrets,它显示为环境变量,如 deploy_type 所指示。它设置到的环境变量 deploy_targetSQL_CONN。最后,我们在 deploy_target 中存储的 Secret 的 keysql_alchemy_conn

操作节点配置类似于如下所示:

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-kube-secrets',
    name='ex-kube-secrets',
    namespace='default',
    image='ubuntu',
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        'EXAMPLE_VAR': '/example/value',
        'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json'})

在不对 DAG 或您的环境进行任何更改的情况下,ex-kube-secrets 任务失败。我们来看看如何调试此任务并解决错误。

  1. 验证 ex-kube-secrets 任务失败
  2. 检查 ex-kube-secrets 任务日志

    查看日志可发现任务由于 Pod took too long to start 错误而失败。出现此错误是因为 Airflow 找不到配置中指定的 Secret secret_env

  3. 如需使用 gcloud 设置 Secret,请执行以下操作:

    1. 运行以下命令,查看有关 Cloud Composer 环境的详细信息:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"

      其中:

      • ENVIRONMENT 是 Cloud Composer 环境的名称
      • LOCATION 是 Cloud Composer 环境所在的区域

        输出类似于以下返回内容:projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>

    2. 为了获取 GKE 集群 ID,请复制输出内容中 /clusters/ 后面的部分(以 -gke 结尾),并将其粘贴到稍后方便找到的某个地方。此输出是您的集群 ID。

    3. 为了获取地区,请复制输出内容中 /zones/ 后面的部分,并将其粘贴到稍后方便找到的某个地方。

    4. 通过运行以下命令连接到您的 GKE 集群:

      gcloud container clusters get-credentials CLUSTER_ID \
      --zone ZONE \
      --project PROJECT
      

      其中:

      • CLUSTER_ID 是您的 GKE 集群 ID
      • ZONE 是您的 GKE 所在的地区
      • PROJECT 是您的 Google Cloud 项目的 ID
    5. 运行以下命令,创建一个将 sql_alchemy_conn 的值设置为 test_value 的 Kubernetes Secret:

      kubectl create secret generic airflow-secrets \
      --from-literal sql_alchemy_conn=test_value
      
  4. 设置该 Secret 后,再次运行 ex-kube-secrets 任务

  5. 验证 ex-kube-secrets 任务成功

Pod 亲和性配置

KubernetesPodOperator 中配置 affinity 参数后,您可以控制要在哪些节点上安排 pod,例如仅在特定节点池中的节点上安排。在此示例中,操作节点仅在名为 pool-0pool-1 的节点池上运行。

租户项目和和带箭头的客户项目中的 Cloud Composer 环境资源,显示启动的 Pod 将与 Airflow 工作器、Redis、Airflow 调度程序和 Cloud SQL 代理位于同一 Kubernetes Engine 中,但 Pod 位于特定的池 pool-0 和 pool -1 中,这两个池显示为 Kubernetes Engine 内单独的框。
Cloud Composer Kubernetes pod 启动位置与 pod 亲和性(点击可放大)


kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-pod-affinity',
    name='ex-pod-affinity',
    namespace='default',
    image='perl',
    cmds=['perl'],
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        'nodeAffinity': {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            'requiredDuringSchedulingIgnoredDuringExecution': {
                'nodeSelectorTerms': [{
                    'matchExpressions': [{
                        # When nodepools are created in Google Kubernetes
                        # Engine, the nodes inside of that nodepool are
                        # automatically assigned the label
                        # 'cloud.google.com/gke-nodepool' with the value of
                        # the nodepool's name.
                        'key': 'cloud.google.com/gke-nodepool',
                        'operator': 'In',
                        # The label key's value that pods can be scheduled
                        # on.
                        'values': [
                            'pool-0',
                            'pool-1',
                        ]
                    }]
                }]
            }
        }
    })

示例采用当前配置时,任务将失败。我们来看看如何调试此任务并解决错误。

  1. 验证 ex-pod-affinity 任务失败
  2. 检查 ex-pod-affinity 任务日志

    查看日志可发现任务失败,因为不存在节点池 pool-0pool-1

  3. 如需确保 values 中的节点池存在,请进行以下任一配置更改:

    • 如果您之前创建了节点池,请将 pool-0pool-1 替换为您的节点池名称,然后重新上传 DAG。
    • 创建一个节点池并将其命名为 pool-0pool-1。您可以同时创建两者,但任务只需一个即可成功。
    • pool-0pool-1 替换为 Airflow 使用的默认池 default-pool,然后再次上传您的 DAG。 注意:默认情况下,Kubernetes Pod 安排在 default-pool 中。 如果您稍后添加池,则池将被限制到 default-pool
  4. 等待您的环境更新,这可能需要几分钟。

  5. 再次运行 ex-pod-affinity 任务

  6. 验证 ex-pod-affinity 任务成功

完整配置

此示例展示了您可以在 KubernetesPodOperator 中配置的所有变量。您不用修改代码,ex-all-configs 任务也会成功

如需了解每个变量的详细信息,请参见 Airflow KubernetesPodOperator 参考文档

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-all-configs',
    name='pi',
    namespace='default',
    image='perl',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['perl'],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={'pod-label': 'label-name'},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={'EXAMPLE_VAR': '/example/value'},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy='Always',
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={'key1': 'value1'},
    # Resource specifications for Pod, this will allow you to set both cpu
    # and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={'limit_memory': 1, 'limit_cpu': 1},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file='/home/airflow/composer_kube_config',
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={})

管理 DAG

查看任务的状态

  1. 转到 Airflow 网页界面
  2. 在 DAG 页面上,点击 DAG 名称(例如 composer_sample_kubernetes_pod)。
  3. 在 DAG 详细信息页面上,点击 Graph View
  4. 检查状态:

    • 失败:任务被一个红色框圈起(例如 ex-kube-templates)。您也可以将鼠标指针放在任务上,查看是否出现 State: Failed

    • 成功:任务被一个绿色框圈起(例如 pod-ex-minimum)。您也可以将鼠标指针放在任务上,查看是否出现 State: Success

检查任务日志

  1. 在 Airflow 界面中,查看任务的状态
  2. 在 DAG 的 Graph View 中,点击相应任务的名称。
  3. 在任务实例上下文菜单上,点击 View Log

再次运行一个任务

  1. 若要返回 DAG,请执行以下操作:
    1. 在 Airflow 界面的工具栏中,点击 DAGs
    2. 点击 DAG 的名称(例如 composer_samples_kubernetes_pod)。
  2. 如需再次运行该任务,请执行以下操作:
    1. 点击该任务的名称。
    2. 点击 Clear,然后点击 OK。此任务会自动重新运行。

问题排查

Pod 问题排查提示

除了检查任务日志之外,还要查看以下日志:

  • Airflow 调度程序和工作器的输出:

    1. 转到 Cloud Composer 环境的 Cloud Storage 存储分区。 这是 DAG 所在的存储分区。
    2. 查看 logs/DAG_NAME/TASK_ID/EXECUTION_DATE 下的日志。
  • Cloud Console 中 GKE 工作负载下的详细 Pod 日志。这些日志包括 pod 定义 YAML 文件、pod 事件和 pod 详情。

同时使用 GKEPodOperator 时的非零返回代码

使用 KubernetesPodOperatorGKEPodOperator 时,容器入口点的返回代码确定任务是否成功。返回非零代码表示失败。

使用 KubernetesPodOperatorGKEPodOperator 时的常见模式是执行一个 shell 脚本作为容器入口点,以将容器内的多个操作分组到一起。

如果您要编写这样的脚本,建议您在脚本的顶部加入 set -e 命令,因而脚本中失败的命令会终止脚本并将故障传播至 Airflow 任务实例。

尽管 Pod 成功,任务仍失败

对于运行 composer-1.4.1-airflow-* 或更早版本的 Cloud Composer 环境:

如果 Airflow 任务运行了一小时,且任务日志以 kubernetes.client.rest.ApiException: (401)Reason: Unauthorized 结束,则底层 Kubernetes 作业会在该时间点后继续执行,甚至可能成功。但是,Airflow 报告任务失败。

如需解决此问题,请在 kubernetes>=8.0.1 时添加明确的 PyPI 软件包依赖项

Pod 超时

KubernetesPodOperator 的默认超时时间为 120 秒,这会导致在较大的映像下载完成之前发生超时。您可以通过在创建 KubernetesPodOperator 时更改 startup_timeout_seconds 参数来增加超时值。

如果某个 pod 超时,您可从 Airflow 网页界面查看相应任务专属的日志。例如:

Executing  on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in 
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

如果 Composer 服务帐号缺少必要的 IAM 权限来执行手头的任务,也会出现 pod 超时。如需验证是否属于这种情况,请使用 GKE 信息中心查看 pod 级错误,找到特定工作负载的日志;或者使用 Stackdriver Logging。

无法建立新连接

GKE 集群默认启用自动升级。 如果集群中的某个节点池正在升级,您可能会看到以下错误:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new connection: [Errno 111] Connection refuse

如需检查您的集群是否正在升级,请在 Cloud Console 中转到 GKE 菜单,并查看您的环境的集群名称旁是否有加载图标。

相关资源