KubernetesPodOperator verwenden.

Cloud Composer 1 Cloud Composer 2

Auf dieser Seite wird beschrieben, wie Sie mit KubernetesPodOperator Kubernetes-Pods von Cloud Composer im Google Kubernetes Engine-Cluster bereitstellen, der Teil Ihrer Cloud Composer-Umgebung ist, und um sicherzustellen, dass Ihre Umgebung über die entsprechenden Ressourcen verfügt.

KubernetesPodOperator startet Kubernetes-Pods im Cluster Ihrer Umgebung. Mit Google Kubernetes Engine-Operatoren werden dagegen Kubernetes-Pods in einem bestimmten Cluster ausgeführt. Dabei kann es sich um einen separaten Cluster handeln, der nicht mit Ihrer Umgebung in Verbindung steht. Sie können Cluster auch mit Google Kubernetes Engine-Operatoren erstellen und löschen.

KubernetesPodOperator ist eine gute Option, wenn Sie Folgendes benötigen:

  • Benutzerdefinierte Python-Abhängigkeiten, die nicht über das öffentliche PyPI-Repository verfügbar sind.
  • Binäre Abhängigkeiten, die im Cloud Composer-Worker-Image nicht verfügbar sind.

Auf dieser Seite werden Sie durch einen Airflow-Beispiel-DAG mit den folgenden KubernetesPodOperator-Konfigurationen geführt:

Hinweise

Ressourcen der Cloud Composer-Umgebung einrichten

Wenn Sie eine Cloud Composer-Umgebung erstellen, geben Sie deren Leistungsparameter an, einschließlich der Leistungsparameter für den Cluster der Umgebung. Das Starten von Kubernetes-Pods im Umgebungscluster kann zu Konflikten bei Clusterressourcen wie CPU oder Arbeitsspeicher führen. Da sich der Airflow-Planer und die Airflow-Worker im selben GKE-Cluster befinden, funktionieren Planer und Worker nicht ordnungsgemäß, wenn diese Konkurrenzsituation einen Ressourcenmangel verursacht.

Führen Sie eine oder mehrere der folgenden Aktionen aus, um einen Ressourcenmangel zu verhindern:

Knotenpool erstellen

Die bevorzugte Methode zum Verhindern eines Ressourcenmangels in der Cloud Composer-Umgebung besteht darin, einen neuen Knotenpool zu erstellen und Kubernetes-Pods so zu konfigurieren, dass sie bei der Ausführung nur Ressourcen aus diesem Pool verwenden.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie auf den Namen Ihrer Umgebung.

  3. Wechseln Sie auf der Seite Umgebungsdetails zum Tab Umgebungskonfiguration.

  4. Folgen Sie im Abschnitt Ressourcen > GKE-Cluster dem Link Clusterdetails aufrufen.

  5. Erstellen Sie einen Knotenpool, wie unter Knotenpool hinzufügen beschrieben.

gcloud

  1. Bestimmen Sie den Namen des Umgebungsclusters:

    gcloud composer environments describe ENVIRONMENT_NAME \
      --location LOCATION \
      --format="value(config.gkeCluster)"
    

    Ersetzen Sie:

    • ENVIRONMENT_NAME durch den Namen der Umgebung.
    • LOCATION durch die Region, in der sich die Umgebung befindet.
  2. Die Ausgabe enthält den Namen des Clusters Ihrer Umgebung. Dies kann beispielsweise europe-west3-example-enviro-af810e25-gke sein.

  3. Erstellen Sie einen Knotenpool, wie unter Knotenpool hinzufügen beschrieben.

Anzahl der Knoten in der Umgebung erhöhen

Wenn Sie die Anzahl der Knoten in der Cloud Composer-Umgebung erhöhen, steht den Arbeitslasten mehr Rechenleistung zur Verfügung. Durch diese Erhöhung werden keine zusätzlichen Ressourcen für Aufgaben bereitgestellt, die mehr CPU oder RAM benötigen, als der angegebene Maschinentyp bietet.

Zum Erhöhen der Knotenanzahl aktualisieren Sie die Umgebung.

Geben Sie den entsprechenden Maschinentyp an.

Wenn Sie die Cloud Composer-Umgebung erstellen, können Sie einen Maschinentyp angeben. Damit immer genug Ressourcen verfügbar sind, sollten Sie einen Maschinentyp für die Art von Computing angeben, das in Ihrer Cloud Composer-Umgebung ausgeführt wird.

KubernetesPodOperator-Konfiguration

Zum Ausführen dieses Beispiels legen Sie die gesamte kubernetes_pod_operator.py-Datei in den dags/-Ordner Ihrer Umgebung. Alternativ können Sie den entsprechenden KubernetesPodOperator-Code einem DAG hinzufügen.

In den folgenden Abschnitten wird jede KubernetesPodOperator-Konfiguration im Beispiel erläutert. Informationen zu den einzelnen Konfigurationsvariablen finden Sie in der Airflow-Referenz.

Airflow 2

import datetime

from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
    dag_id="composer_sample_kubernetes_pod",
    schedule_interval=datetime.timedelta(days=1),
    start_date=YESTERDAY,
) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, currently the operator defaults
    # to using the config file found at `/home/airflow/composer_kube_config if
    # no `config_file` parameter is specified. By default it will contain the
    # credentials for Cloud Composer's Google Kubernetes Engine cluster that is
    # created upon environment creation.

    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        task_id="pod-ex-minimum",
        # Name of task you want to run, used to generate Pod ID.
        name="pod-ex-minimum",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # The namespace to run within Kubernetes, default namespace is
        # `default`. In Composer 1 there is the potential for
        # the resource starvation of Airflow workers and scheduler
        # within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources,
        # and using Composer 2 will mean the environment will autoscale.
        namespace="default",
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
    )
    kubenetes_template_ex = KubernetesPodOperator(
        task_id="ex-kube-templates",
        name="ex-kube-templates",
        namespace="default",
        image="bash",
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=["{{ ds }}"],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}",
    )
    kubernetes_secret_vars_ex = KubernetesPodOperator(
        task_id="ex-kube-secrets",
        name="ex-kube-secrets",
        namespace="default",
        image="ubuntu",
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            "EXAMPLE_VAR": "/example/value",
            "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
        },
    )
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    kubernetes_affinity_ex = KubernetesPodOperator(
        task_id="ex-pod-affinity",
        name="ex-pod-affinity",
        namespace="default",
        image="perl:5.34.0",
        cmds=["perl"],
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # affinity allows you to constrain which nodes your pod is eligible to
        # be scheduled on, based on labels on the node. In this case, if the
        # label 'cloud.google.com/gke-nodepool' with value
        # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
        # nodes, it will fail to schedule.
        affinity={
            "nodeAffinity": {
                # requiredDuringSchedulingIgnoredDuringExecution means in order
                # for a pod to be scheduled on a node, the node must have the
                # specified labels. However, if labels on a node change at
                # runtime such that the affinity rules on a pod are no longer
                # met, the pod will still continue to run on the node.
                "requiredDuringSchedulingIgnoredDuringExecution": {
                    "nodeSelectorTerms": [
                        {
                            "matchExpressions": [
                                {
                                    # When nodepools are created in Google Kubernetes
                                    # Engine, the nodes inside of that nodepool are
                                    # automatically assigned the label
                                    # 'cloud.google.com/gke-nodepool' with the value of
                                    # the nodepool's name.
                                    "key": "cloud.google.com/gke-nodepool",
                                    "operator": "In",
                                    # The label key's value that pods can be scheduled
                                    # on.
                                    "values": [
                                        "pool-0",
                                        "pool-1",
                                    ],
                                }
                            ]
                        }
                    ]
                }
            }
        },
    )
    kubernetes_full_pod = KubernetesPodOperator(
        task_id="ex-all-configs",
        name="pi",
        namespace="default",
        image="perl:5.34.0",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["perl"],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={"pod-label": "label-name"},
        # Timeout to start up the Pod, default is 120.
        startup_timeout_seconds=120,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={"EXAMPLE_VAR": "/example/value"},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy="Always",
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={"key1": "value1"},
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 2.3 and the cncf providers package 5.0.0
        # resources were passed as a dictionary. This change was made in
        # https://github.com/apache/airflow/pull/27197
        # Additionally, "memory" and "cpu" were previously named
        # "limit_memory" and "limit_cpu"
        # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
        container_resources=k8s_models.V1ResourceRequirements(
            limits={"memory": "250M", "cpu": "100m"},
        ),
        # Specifies path to kubernetes config. If no config is specified will
        # default to '~/.kube/config'. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        # Pod affinity with the KubernetesPodOperator
        # is not supported with Composer 2
        # instead, create a cluster and use the GKEStartPodOperator
        # https://cloud.google.com/composer/docs/using-gke-operator
        affinity={},
    )

Airflow 1

import datetime

from airflow import models
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
    dag_id="composer_sample_kubernetes_pod",
    schedule_interval=datetime.timedelta(days=1),
    start_date=YESTERDAY,
) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, currently the operator defaults
    # to using the config file found at `/home/airflow/composer_kube_config if
    # no `config_file` parameter is specified. By default it will contain the
    # credentials for Cloud Composer's Google Kubernetes Engine cluster that is
    # created upon environment creation.

    kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
        # The ID specified for the task.
        task_id="pod-ex-minimum",
        # Name of task you want to run, used to generate Pod ID.
        name="pod-ex-minimum",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # The namespace to run within Kubernetes, default namespace is
        # `default`. There is the potential for the resource starvation of
        # Airflow workers and scheduler within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources.
        namespace="default",
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
    )
    kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id="ex-kube-templates",
        name="ex-kube-templates",
        namespace="default",
        image="bash",
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=["{{ ds }}"],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}",
    )
    kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id="ex-kube-secrets",
        name="ex-kube-secrets",
        namespace="default",
        image="ubuntu",
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            "EXAMPLE_VAR": "/example/value",
            "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
        },
    )
    kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id="ex-pod-affinity",
        name="ex-pod-affinity",
        namespace="default",
        image="perl:5.34.0",
        cmds=["perl"],
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # affinity allows you to constrain which nodes your pod is eligible to
        # be scheduled on, based on labels on the node. In this case, if the
        # label 'cloud.google.com/gke-nodepool' with value
        # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
        # nodes, it will fail to schedule.
        affinity={
            "nodeAffinity": {
                # requiredDuringSchedulingIgnoredDuringExecution means in order
                # for a pod to be scheduled on a node, the node must have the
                # specified labels. However, if labels on a node change at
                # runtime such that the affinity rules on a pod are no longer
                # met, the pod will still continue to run on the node.
                "requiredDuringSchedulingIgnoredDuringExecution": {
                    "nodeSelectorTerms": [
                        {
                            "matchExpressions": [
                                {
                                    # When nodepools are created in Google Kubernetes
                                    # Engine, the nodes inside of that nodepool are
                                    # automatically assigned the label
                                    # 'cloud.google.com/gke-nodepool' with the value of
                                    # the nodepool's name.
                                    "key": "cloud.google.com/gke-nodepool",
                                    "operator": "In",
                                    # The label key's value that pods can be scheduled
                                    # on.
                                    "values": [
                                        "pool-0",
                                        "pool-1",
                                    ],
                                }
                            ]
                        }
                    ]
                }
            }
        },
    )
    kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
        task_id="ex-all-configs",
        name="pi",
        namespace="default",
        image="perl:5.34.0",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["perl"],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={"pod-label": "label-name"},
        # Timeout to start up the Pod, default is 120.
        startup_timeout_seconds=120,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={"EXAMPLE_VAR": "/example/value"},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy="Always",
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={"key1": "value1"},
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 1.10.4, resource specifications were
        # passed as a Pod Resources Class object,
        # If using this example on a version of Airflow prior to 1.10.4,
        # import the "pod" package from airflow.contrib.kubernetes and use
        # resources = pod.Resources() instead passing a dict
        # For more info see:
        # https://github.com/apache/airflow/pull/4551
        resources={"limit_memory": "250M", "limit_cpu": "100m"},
        # Specifies path to kubernetes config. If no config is specified will
        # default to '~/.kube/config'. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        affinity={},
    )

Minimalkonfiguration

Zum Erstellen eines KubernetesPodOperator sind nur die name des Pods, das namespace, in dem der Pod ausgeführt werden soll, und das zu verwendende image sowie das task_id erforderlich.

Wenn Sie das folgende Code-Snippet in eine DAG einfügen, verwendet die Konfiguration die Standardwerte in /home/airflow/composer_kube_config. Sie müssen den Code nicht ändern, damit die Aufgabe pod-ex-minimum erfolgreich ausgeführt wird.

Airflow 2

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. In Composer 1 there is the potential for
    # the resource starvation of Airflow workers and scheduler
    # within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources,
    # and using Composer 2 will mean the environment will autoscale.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Airflow 1

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Vorlagenkonfiguration

Airflow unterstützt die Verwendung von Jinja-Vorlagen. Sie müssen die erforderlichen Variablen (task_id, name, namespace, image) mit dem Operator deklarieren. Wie im folgenden Beispiel gezeigt, können Sie alle anderen Parameter mit Jinja als Vorlage verwenden, einschließlich cmds, arguments, env_vars und config_file.

Airflow 2

kubenetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Airflow 1

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Wenn der DAG oder die Umgebung nicht geändert wird, schlägt die Aufgabe ex-kube-templates aufgrund von zwei Fehlern fehl. Die Logs zeigen, dass diese Aufgabe fehlschlägt, weil die entsprechende Variable (my_value) fehlt. Der zweite Fehler, den Sie nach dem Beheben des ersten Fehlers erhalten können, zeigt, dass die Aufgabe fehlschlägt, weil core/kube_config nicht in config gefunden wird.

Gehen Sie wie unten beschrieben vor, um beide Fehler zu beheben.

So legen Sie my_value mit gcloud oder der Airflow-UI fest:

Airflow-UI

In der Airflow 2-UI:

  1. Rufen Sie die Airflow-UI auf.

  2. Klicken Sie in der Symbolleiste auf Admin > Variables.

  3. Klicken Sie auf der Seite Listenvariable auf Neuen Eintrag hinzufügen.

  4. Geben Sie auf der Seite Add Variable (Variable hinzufügen) die folgenden Informationen ein:

    • Key: my_value
    • Val: example_value
  5. Klicken Sie auf Speichern.

In der Airflow 1-UI:

  1. Rufen Sie die Airflow-UI auf.

  2. Klicken Sie in der Symbolleiste auf Admin > Variables.

  3. Klicken Sie auf der Seite Variablen auf den Tab Erstellen.

  4. Geben Sie auf der Seite Variable die folgenden Informationen ein:

    • Key: my_value
    • Val: example_value
  5. Klicken Sie auf Speichern.

gcloud

Geben Sie für Airflow 2 den folgenden Befehl ein:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Geben Sie für Airflow 1 den folgenden Befehl ein:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set my_value example_value

Ersetzen Sie:

  • ENVIRONMENT durch den Namen der Umgebung.
  • LOCATION durch die Region, in der sich die Umgebung befindet.

Wenn Sie auf eine benutzerdefinierte config_file (Kubernetes-Konfigurationsdatei) verweisen möchten, überschreiben Sie die Airflow-Konfigurationsoption kube_config durch eine gültige Kubernetes-Konfiguration:

Bereich Schlüssel Wert
core kube_config /home/airflow/composer_kube_config

Warten Sie einige Minuten, bis die Umgebung aktualisiert wurde. Führen Sie dann die Aufgabe ex-kube-templates noch einmal aus und prüfen Sie, ob die Aufgabe ex-kube-templates erfolgreich ausgeführt wurde.

Konfiguration von Secret-Variablen

Ein Kubernetes-Secret ist ein Objekt, das sensible Daten enthält. Mit KubernetesPodOperator können Sie Secrets an die Kubernetes-Pods übergeben. Secrets müssen in Kubernetes definiert werden. Ansonsten kann der Pod nicht gestartet werden.

Dieses Beispiel zeigt zwei Möglichkeiten für die Verwendung von Kubernetes Secrets: als Umgebungsvariable und als vom Pod bereitgestelltes Volume.

Das erste Secret, airflow-secrets, ist auf eine Kubernetes-Umgebungsvariable namens SQL_CONN festgelegt (nicht auf eine Airflow- oder Cloud Composer-Umgebungsvariable).

Das zweite Secret, service-account, stellt service-account.json, eine Datei mit einem Dienstkontotoken, in /var/secrets/google bereit.

Die Secrets sehen so aus:

Airflow 2

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Airflow 1

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Der Name des ersten Kubernetes-Secrets wird in der Variablen secret definiert. Der Name dieses speziellen Secrets lautet airflow-secrets. Es wird als Umgebungsvariable gemäß der Vorgabe von deploy_type bereitgestellt. Die Umgebungsvariable, für die es festgelegt wird (deploy_target), lautet SQL_CONN. Schließlich lautet der key des Secrets, das in deploy_target gespeichert ist, sql_alchemy_conn.

Der Name des zweiten Kubernetes-Secrets wird in der Variablen secret definiert. Der Name dieses speziellen Secrets lautet service-account. Es wird als Volume bereitgestellt, wie von deploy_type festgelegt. Der Pfad der bereitzustellenden Datei (deploy_target) lautet /var/secrets/google. Schließlich lautet der key des Secrets, das in deploy_target gespeichert ist, service-account.json.

So sieht die Operatorkonfiguration aus:

Airflow 2

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Airflow 1

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Wenn Sie keine Änderungen am DAG oder der Umgebung vornehmen, schlägt die Aufgabe ex-kube-secrets fehl. Aus den Logs geht hervor, dass die Aufgabe aufgrund des Fehlers Pod took too long to start fehlschlägt. Dieser Fehler tritt auf, weil Airflow das in der Konfiguration angegebene Secret secret_env nicht finden kann.

gcloud

So legen Sie das Secret mit gcloud fest:

  1. Informationen über Ihren Cloud Composer-Umgebungscluster abrufen

    1. Führen Sie dazu diesen Befehl aus:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      Ersetzen Sie:

      • ENVIRONMENT durch den Namen Ihrer Umgebung.
      • LOCATION durch die Region, in der sich die Cloud Composer-Umgebung befindet.

      Die Ausgabe dieses Befehls hat das folgende Format: projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>.

    2. Zum Abrufen der GKE-Cluster-ID kopieren Sie die Ausgabe nach /clusters/ (endet auf -gke).

    3. Zum Abrufen der Zone kopieren Sie die Ausgabe nach /zones/.

  2. Stellen Sie mit folgendem Befehl eine Verbindung zum GKE-Cluster her:

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --zone ZONE
    

    Ersetzen Sie:

    • CLUSTER_ID durch Ihre GKE-Cluster-ID.
    • Ersetzen Sie PROJECT durch die ID Ihres Google Cloud-Projekts.
    • ZONE ist die Zone, in der sich Ihr GKE befindet.
  3. Kubernetes-Secrets erstellen.

    1. Erstellen Sie mit dem folgenden Befehl ein Kubernetes-Secret, das den Wert von sql_alchemy_conn auf test_value festlegt:

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value
      
    2. Erstellen Sie mit folgendem Befehl ein Kubernetes-Secret, das den Wert von service-account.json auf einen lokalen Pfad einer Dienstkontoschlüsseldatei namens key.json festlegt:

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json
      
  4. Nachdem Sie die Secrets festgelegt haben, führen Sie die Aufgabe ex-kube-secrets noch einmal in der Airflow-UI aus.

  5. Prüfen Sie, ob die Aufgabe ex-kube-secrets erfolgreich ist.

Konfiguration der Pod-Affinität

Mit der Konfiguration des Parameters affinity in KubernetesPodOperator können Sie steuern, auf welchen Knoten Pods geplant werden sollen, beispielsweise nur auf Knoten in einem bestimmten Knotenpool. In diesem Beispiel wird der Operator nur in Knotenpools namens pool-0 und pool-1 ausgeführt. Da sich Ihre Cloud Composer 1-Umgebungsknoten im default-pool befinden, werden Ihre Pods nicht auf den Knoten in Ihrer Umgebung ausgeführt.

Airflow 2

# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Airflow 1

kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Da das Beispiel derzeit konfiguriert ist, schlägt die Aufgabe fehl. Aus den Logs geht hervor, dass die Aufgabe fehlschlägt, weil die Knotenpools pool-0 und pool-1 nicht vorhanden sind.

Damit die Knotenpools in values zu finden sind, nehmen Sie eine der folgenden Konfigurationsänderungen vor:

  • Wenn Sie einen Knotenpool zuvor erstellt haben, ersetzen Sie pool-0 und pool-1 durch die Namen Ihrer Knotenpools. Laden Sie dann Ihren DAG noch einmal hoch.

  • Erstellen Sie einen Knotenpool namens pool-0 oder pool-1. Sie können beide erstellen, zur erfolgreichen Ausführung der Aufgabe ist aber nur einer erforderlich.

  • Ersetzen Sie pool-0 und pool-1 durch default-pool. Dies ist der von Airflow verwendete Standardpool. Laden Sie dann Ihren DAG noch einmal hoch.

Warten Sie nach den Änderungen einige Minuten, bis die Umgebung aktualisiert wurde. Führen Sie dann die Aufgabe ex-pod-affinity noch einmal aus und prüfen Sie, ob die Aufgabe ex-pod-affinity erfolgreich ausgeführt wurde.

Vollständige Konfiguration

In diesem Beispiel werden alle Variablen gezeigt, die Sie in KubernetesPodOperator konfigurieren können. Sie müssen den Code nicht ändern, damit die Aufgabe ex-all-configs erfolgreich ist.

Weitere Informationen zu den einzelnen Variablen finden Sie in der KubernetesPodOperator-Referenz von Airflow.

Airflow 2

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        limits={"memory": "250M", "cpu": "100m"},
    ),
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Airflow 1

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={"limit_memory": "250M", "limit_cpu": "100m"},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={},
)

Informationen zum CNCF-Kubernetes-Anbieter

GKEStartPodOperator und KubernetesPodOperator sind im Anbieter apache-airflow-providers-cncf-kubernetes implementiert.

Fehlerhafte Versionshinweise für den CNCF-Kubernetes-Anbieter finden Sie auf der Website des CNCF-Kubernetes-Anbieters.

Version 6.0.0

In Version 6.0.0 des CNCF-Kubernetes-Anbieterpakets wird die kubernetes_default-Verbindung standardmäßig in der KubernetesPodOperator verwendet.

Wenn Sie in Version 5.0.0 eine benutzerdefinierte Verbindung angegeben haben, wird diese weiterhin vom Operator verwendet. Wenn Sie wieder die kubernetes_default-Verbindung verwenden möchten, müssen Sie Ihre DAGs möglicherweise entsprechend anpassen.

Version 5.0.0

Diese Version enthält einige abwärtsinkompatible Änderungen im Vergleich zu Version 4.4.0. Die wichtigsten, die Sie beachten sollten, beziehen sich auf die kubernetes_default-Verbindung, die in Version 5.0.0 nicht verwendet wird.

  • Die kubernetes_default-Verbindung muss geändert werden: Der Kube-Konfigurationspfad muss auf /home/airflow/composer_kube_config (siehe Abbildung 1) gesetzt sein oder config_file muss der KubernetesPodOperator-Konfiguration hinzugefügt werden (wie unten dargestellt).
Kube-Konfigurationspfadfeld in der Airflow-UI
Abbildung 1. Airflow-UI mit Änderung der Verbindung „kubernetes_default“ (zum Vergrößern anklicken)
  • So ändern Sie den Code einer Aufgabe mit KubernetesPodOperator:
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

Weitere Informationen zu Version 5.0.0 finden Sie in den Versionshinweisen für CNCF-Kubernetes-Anbieter.

Fehlerbehebung

Tipps zur Behebung von Pod-Fehlern

Prüfen Sie neben den Aufgabenlogs in der Airflow-UI auch die folgenden Logs:

  • Ausgabe des Airflow-Planers und der Airflow-Worker:

    1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

      Zur Seite Umgebungen“

    2. Folgen Sie dem Link zu DAGs für Ihre Umgebung.

    3. Gehen Sie in dem Bucket Ihrer Umgebung eine Ebene höher.

    4. Prüfen Sie die Logs im Ordner logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>.

  • Detaillierte Pod-Logs in der Google Cloud Console unter GKE-Arbeitslasten. Diese Logs enthalten die YAML-Datei der Pod-Definition, Pod-Ereignisse und Pod-Informationen.

Rückgabewerte ungleich null, wenn auch GKEStartPodOperator verwendet wird

Wenn Sie KubernetesPodOperator und GKEStartPodOperator verwenden, bestimmt der Rückgabecode des Einstiegspunkts des Containers, ob die Aufgabe als erfolgreich betrachtet wird oder nicht. Rückgabecodes mit einem Wert ungleich null weisen auf einen Fehler hin.

Ein gängiges Muster bei der Verwendung von KubernetesPodOperator und GKEStartPodOperator besteht darin, ein Shell-Skript als Container-Einstiegspunkt auszuführen, um mehrere Vorgänge im Container zusammenzufassen.

Wenn Sie ein solches Skript schreiben, sollten Sie den Befehl set -e oben im Skript einfügen, sodass fehlgeschlagene Befehle im Skript das Skript beenden und den Fehler an die Airflow-Aufgabeninstanz weiterleiten.

Pod-Zeitüberschreitungen

Das Standardzeitlimit für KubernetesPodOperator beträgt 120 Sekunden. Dies kann zu Zeitüberschreitungen führen, bevor größere Images heruntergeladen sind. Sie können das Zeitlimit erhöhen, wenn Sie beim Erstellen von KubernetesPodOperator den Parameter startup_timeout_seconds ändern.

Wenn eine Pod-Zeitüberschreitung auftritt, ist das aufgabenspezifische Log in der Airflow-UI verfügbar. Beispiel:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Pod-Zeitüberschreitungen können auch auftreten, wenn das Cloud Composer-Dienstkonto nicht die erforderlichen IAM-Berechtigungen zum Ausführen der jeweiligen Aufgabe hat. Das können Sie prüfen, wenn Sie sich in den GKE-Dashboards die Fehler auf Pod-Ebene in den Logs für die jeweilige Arbeitslast ansehen oder Cloud Logging verwenden.

Es konnte keine neue Verbindung hergestellt werden

Automatische Upgrades sind in GKE-Clustern standardmäßig aktiviert. Wenn sich ein Knotenpool in einem Cluster befindet, für den gerade ein Upgrade durchgeführt wird, sehen Sie möglicherweise die folgende Fehlermeldung:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Wenn Sie prüfen möchten, ob der Cluster aktualisiert wird, rufen Sie in der Google Cloud Console die Seite Kubernetes-Cluster auf und suchen Sie nach dem Ladesymbol neben dem Clusternamen Ihrer Umgebung.

Nächste Schritte