KubernetesPodOperator verwenden.

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Auf dieser Seite wird beschrieben, wie Sie die KubernetesPodOperator zum Bereitstellen verwenden Kubernetes-Pods von Cloud Composer in die Google Kubernetes Engine der Teil Ihrer Cloud Composer-Umgebung ist, und stellen Sie sicher, ob Ihre Umgebung über die entsprechenden Ressourcen verfügt.

KubernetesPodOperator Markteinführungen Kubernetes-Pods im Cluster Ihrer Umgebung. Im Vergleich dazu Mit Google Kubernetes Engine-Operatoren werden Kubernetes-Pods in einem bestimmten Cluster, der ein separater Cluster sein kann, der nicht mit Ihrem zu verbessern. Sie können Cluster auch mit Google Kubernetes Engine-Operatoren.

KubernetesPodOperator ist eine gute Option, wenn Sie Folgendes benötigen:

  • Benutzerdefinierte Python-Abhängigkeiten, die nicht über das öffentliche PyPI-Repository verfügbar sind.
  • Binäre Abhängigkeiten, die im Cloud Composer-Worker-Image nicht verfügbar sind.

Auf dieser Seite wird ein Beispiel für einen Airflow-DAG beschrieben, der Folgendes enthält: KubernetesPodOperator-Konfigurationen:

Hinweise

  • In Cloud Composer 3 der Cluster Ihrer Umgebung automatisch skaliert wird. Zusätzliche Arbeitslasten, die Sie mit KubernetesPodOperator ausführen, werden unabhängig von Ihrer Umgebung skaliert. Ihre Umgebung ist nicht von der erhöhten Ressourcennachfrage betroffen, aber der Cluster Ihrer Umgebung wird je nach Ressourcenbedarf skaliert. Die Preise für die zusätzlichen Arbeitslasten, die Sie in Ihrem und der Cluster der Umgebung Cloud Composer 3-Preismodell und verwendet Cloud Composer 3-SKUs.

  • In Cloud Composer 3 befindet sich der Cluster Ihrer Umgebung im Mandantenprojekt. KubernetesPodOperator funktioniert jedoch ohne dass im Vergleich zu Cloud Composer 2 Codeänderungen erforderlich sind. Pods werden im Cluster der Umgebung in einem isolierten Namespace ausgeführt. aber mit Zugriff auf Ihr VPC-Netzwerk (sofern aktiviert).

  • Cloud Composer 3 verwendet GKE-Cluster mit Workload Identity. Standardmäßig werden Pods die in einem neu erstellten Namespace oder im composer-user-workloads ausgeführt werden Namespace kann nicht auf Google Cloud-Ressourcen zugreifen. Bei Verwendung Workload Identity, Kubernetes-Dienstkonten, die mit Namespaces müssen Google Cloud-Dienstkonten zugeordnet werden, Aktivieren der Dienstidentitätsautorisierung für Anfragen an Google APIs und andere .

    Wenn Sie daher Pods im Namespace composer-user-workloads ausführen, oder einem neu erstellten Namespace im Cluster Ihrer Umgebung, IAM-Bindungen zwischen Kubernetes und Google Cloud Dienstkonten werden nicht erstellt und diese Pods können nicht auf Ressourcen zugreifen Ihres Google Cloud-Projekts.

    Wenn Ihre Pods Zugriff auf Google Cloud-Ressourcen haben sollen, Verwenden Sie dann den Namespace composer-user-workloads oder erstellen Sie einen eigenen -Namespace, wie weiter beschrieben.

    Um Zugriff auf die Ressourcen Ihres Projekts zu gewähren, folgen Sie der Anleitung in Workload Identity und richte die Bindungen ein:

    1. Erstellen Sie einen separaten Namespace im Cluster Ihrer Umgebung.
    2. Erstellen Sie eine Bindung zwischen den Kubernetes-Dienstkonto composer-user-workloads/<namespace_name> und das Dienstkonto Ihrer Umgebung.
    3. Fügen Sie dem Kubernetes-Dienstkonto die Dienstkontoannotation Ihrer Umgebung hinzu.
    4. Wenn Sie KubernetesPodOperator verwenden, geben Sie den Namespace und das Kubernetes-Dienstkonto in den Parametern namespace und service_account_name an.
  • Cloud Composer 3 verwendet GKE-Cluster mit Arbeitslast Identität. Der GKE-Metadatenserver benötigt einige Sekunden, Anfragen auf einem neu erstellten Pod anzunehmen. Daher wird versucht, mit Workload Identity innerhalb der ersten Sekunden Die Lebensdauer des Pods kann fehlschlagen. Weitere Informationen zu dieser Einschränkung finden Sie unter Einschränkungen von Workload Identity.

  • Cloud Composer 3 verwendet Autopilot-Cluster, die das Konzept Compute-Klassen:

    • Wenn kein Kurs ausgewählt ist, gilt standardmäßig die Klasse general-purpose wird angenommen, wenn Sie Pods mit KubernetesPodOperator erstellen.

    • Jede Klasse ist mit bestimmten Attributen und Ressourcenlimits verknüpft. Weitere Informationen dazu finden Sie in Dokumentation zu Autopilot. Beispielsweise können Pods, die in der Klasse general-purpose ausgeführt werden, Folgendes verwenden: bis zu 110 GiB Arbeitsspeicher.

KubernetesPodOperator-Konfiguration

Zum Ausführen dieses Beispiels legen Sie die gesamte kubernetes_pod_operator.py-Datei in den dags/-Ordner Ihrer Umgebung. Alternativ können Sie den entsprechenden KubernetesPodOperator-Code einem DAG hinzufügen.

In den folgenden Abschnitten wird jede KubernetesPodOperator-Konfiguration im Beispiel erläutert. Informationen zu den einzelnen Konfigurationsvariablen finden Sie in der Airflow-Referenz.

"""Example DAG using KubernetesPodOperator."""
import datetime

from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
    dag_id="composer_sample_kubernetes_pod",
    schedule_interval=datetime.timedelta(days=1),
    start_date=YESTERDAY,
) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, the config file found at
    # `/home/airflow/composer_kube_config` contains credentials for
    # Cloud Composer's Google Kubernetes Engine cluster that is created
    # upon environment creation.
    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        task_id="pod-ex-minimum",
        # Name of task you want to run, used to generate Pod ID.
        name="pod-ex-minimum",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # The namespace to run within Kubernetes. In Composer 2 environments
        # after December 2022, the default namespace is
        # `composer-user-workloads`.
        namespace="composer-user-workloads",
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_template_ex = KubernetesPodOperator(
        task_id="ex-kube-templates",
        name="ex-kube-templates",
        namespace="composer-user-workloads",
        image="bash",
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=["{{ ds }}"],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_secret_vars_ex = KubernetesPodOperator(
        task_id="ex-kube-secrets",
        name="ex-kube-secrets",
        namespace="composer-user-workloads",
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        cmds=["echo"],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            "EXAMPLE_VAR": "/example/value",
            "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
        },
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_full_pod = KubernetesPodOperator(
        task_id="ex-all-configs",
        name="pi",
        namespace="composer-user-workloads",
        image="perl:5.34.0",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["perl"],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={"pod-label": "label-name"},
        # Timeout to start up the Pod, default is 600.
        startup_timeout_seconds=600,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={"EXAMPLE_VAR": "/example/value"},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy="Always",
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={"key1": "value1"},
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 2.3 and the cncf providers package 5.0.0
        # resources were passed as a dictionary. This change was made in
        # https://github.com/apache/airflow/pull/27197
        # Additionally, "memory" and "cpu" were previously named
        # "limit_memory" and "limit_cpu"
        # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
        container_resources=k8s_models.V1ResourceRequirements(
            requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
            limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        ),
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        # Pod affinity with the KubernetesPodOperator
        # is not supported with Composer 2
        # instead, create a cluster and use the GKEStartPodOperator
        # https://cloud.google.com/composer/docs/using-gke-operator
        affinity={},
    )

Minimalkonfiguration

Zum Erstellen eines KubernetesPodOperator muss nur name, namespace des Pods, den Pod ausführen. image ist erforderlich und task_id sind erforderlich.

Wenn Sie das folgende Code-Snippet in eine DAG einfügen, verwendet die Konfiguration die Standardwerte in /home/airflow/composer_kube_config. Sie müssen den Code nicht ändern, damit die Aufgabe pod-ex-minimum erfolgreich ausgeführt wird.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Vorlagenkonfiguration

Airflow unterstützt die Verwendung von Jinja-Vorlagen. Sie müssen die erforderlichen Variablen (task_id, name, namespace, image) mit dem Operator deklarieren. Wie im folgenden Beispiel gezeigt, können Sie alle anderen Parameter mit Jinja als Vorlage verwenden, einschließlich cmds, arguments, env_vars und config_file.

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Wenn der DAG oder die Umgebung nicht geändert wird, schlägt die Aufgabe ex-kube-templates aufgrund von zwei Fehlern fehl. Die Logs zeigen, dass diese Aufgabe fehlschlägt, weil die entsprechende Variable (my_value) fehlt. Der zweite Fehler, den Sie nach dem Beheben des ersten Fehlers erhalten können, zeigt, dass die Aufgabe fehlschlägt, weil core/kube_config nicht in config gefunden wird.

Gehen Sie wie unten beschrieben vor, um beide Fehler zu beheben.

So legen Sie my_value mit gcloud oder der Airflow-UI fest:

Airflow-UI

In der Airflow 2-UI:

  1. Rufen Sie die Airflow-UI auf.

  2. Klicken Sie in der Symbolleiste auf Admin > Variables.

  3. Klicken Sie auf der Seite Listenvariable auf Neuen Eintrag hinzufügen.

  4. Geben Sie auf der Seite Add Variable (Variable hinzufügen) die folgenden Informationen ein:

    • Key: my_value
    • Val: example_value
  5. Klicken Sie auf Speichern.

gcloud

Geben Sie für Airflow 2 den folgenden Befehl ein:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Ersetzen Sie:

  • ENVIRONMENT durch den Namen der Umgebung.
  • LOCATION durch die Region, in der sich die Umgebung befindet.

Wenn Sie auf eine benutzerdefinierte config_file (Kubernetes-Konfigurationsdatei) verweisen möchten, überschreiben Sie die Airflow-Konfigurationsoption kube_config durch eine gültige Kubernetes-Konfiguration:

Bereich Schlüssel Wert
core kube_config /home/airflow/composer_kube_config

Warten Sie einige Minuten, bis die Umgebung aktualisiert wurde. Führen Sie dann die Aufgabe ex-kube-templates noch einmal aus und prüfen Sie, ob die Aufgabe ex-kube-templates erfolgreich ausgeführt wurde.

Vollständige Konfiguration

In diesem Beispiel werden alle Variablen gezeigt, die Sie in KubernetesPodOperator konfigurieren können. Sie müssen den Code nicht ändern, damit die Aufgabe ex-all-configs erfolgreich ist.

Weitere Informationen zu den einzelnen Variablen finden Sie in der KubernetesPodOperator-Referenz von Airflow.

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Informationen zum CNCF-Kubernetes-Anbieter

GKEStartPodOperator und KubernetesPodOperator sind in apache-airflow-providers-cncf-kubernetes-Anbieter.

Fehlerhafte Versionshinweise für den CNCF-Kubernetes-Anbieter finden Sie auf der Website des CNCF-Kubernetes-Anbieters.

Version 6.0.0

In Version 6.0.0 des CNCF-Kubernetes-Anbieterpakets Die Verbindung kubernetes_default wird standardmäßig in KubernetesPodOperator.

Wenn Sie in Version 5.0.0 eine benutzerdefinierte Verbindung angegeben haben, weiterhin vom Operator verwendet wird. So wechselst du zurück zur kubernetes_default Verbindung herstellen, möchten Sie Ihre DAGs möglicherweise entsprechend anpassen.

Version 5.0.0

Diese Version enthält einige abwärtsinkompatible Änderungen. im Vergleich zu Version 4.4.0. Die wichtigsten beziehen sich auf Die kubernetes_default-Verbindung, die in Version 5.0.0 nicht verwendet wird.

  • Die kubernetes_default-Verbindung muss geändert werden. Kube-Konfigurationspfad muss auf /home/airflow/composer_kube_config festgelegt sein (siehe Abbildung 1). Alternativ muss config_file der KubernetesPodOperator-Konfiguration hinzugefügt werden (wie im folgenden Code gezeigt). )
<ph type="x-smartling-placeholder"></ph> Feld für Kube-Konfigurationspfad in der Airflow-UI
Abbildung 1: Airflow-UI, Ändern der kubernetes_default-Verbindung (zum Vergrößern klicken)
<ph type="x-smartling-placeholder">
    </ph>
  • So ändern Sie den Code einer Aufgabe mit KubernetesPodOperator:
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

Weitere Informationen zu Version 5.0.0 finden Sie in den Versionshinweisen für CNCF-Kubernetes-Anbieter.

Fehlerbehebung

Tipps zur Behebung von Pod-Fehlern

Prüfen Sie neben den Aufgabenlogs in der Airflow-UI auch die folgenden Logs:

  • Ausgabe des Airflow-Planers und der Airflow-Worker:

    1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

      Zur Seite Umgebungen

    2. Folgen Sie dem Link zu DAGs für Ihre Umgebung.

    3. Gehen Sie in dem Bucket Ihrer Umgebung eine Ebene höher.

    4. Prüfen Sie die Logs im Ordner logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>.

  • Detaillierte Pod-Logs in der Google Cloud Console unter GKE-Arbeitslasten. Diese Logs enthalten die YAML-Datei der Pod-Definition, Pod-Ereignisse und Pod-Informationen.

Rückgabewerte ungleich null, wenn auch GKEStartPodOperator verwendet wird

Wenn Sie KubernetesPodOperator und GKEStartPodOperator verwenden, bestimmt der Rückgabecode des Einstiegspunkts des Containers, ob die Aufgabe als erfolgreich betrachtet wird oder nicht. Rückgabecodes mit einem Wert ungleich null weisen auf einen Fehler hin.

Ein häufiges Muster bei Verwendung von KubernetesPodOperator und GKEStartPodOperator ist das Ausführen eines Shell-Skripts als Container Einstiegspunkt zum Gruppieren mehrerer Vorgänge innerhalb des Containers.

Wenn Sie ein solches Skript schreiben, sollten Sie den Befehl set -e oben im Skript einfügen, sodass fehlgeschlagene Befehle im Skript das Skript beenden und den Fehler an die Airflow-Aufgabeninstanz weiterleiten.

Pod-Zeitüberschreitungen

Das Standardzeitlimit für KubernetesPodOperator beträgt 120 Sekunden. Dies kann zu Zeitüberschreitungen führen, bevor größere Images heruntergeladen sind. Sie können das Zeitlimit erhöhen, wenn Sie beim Erstellen von KubernetesPodOperator den Parameter startup_timeout_seconds ändern.

Wenn eine Pod-Zeitüberschreitung auftritt, ist das aufgabenspezifische Log in der Airflow-UI verfügbar. Beispiel:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Pod-Zeitüberschreitungen können auch auftreten, wenn Cloud Composer-Dienstkonto nicht über die erforderlichen IAM-Berechtigungen verfügt, um die Aufgabe am Hand. Sehen Sie sich hierzu die Fehler auf Pod-Ebene mit der Methode GKE-Dashboards zum Ansehen der Logs für Ihre oder Cloud Logging verwenden.

Neue Verbindung konnte nicht hergestellt werden

Automatische Upgrades sind in GKE-Clustern standardmäßig aktiviert. Wenn sich ein Knotenpool in einem Cluster befindet, für den gerade ein Upgrade durchgeführt wird, sehen Sie möglicherweise die folgende Fehlermeldung:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Wenn Sie prüfen möchten, ob ein Upgrade des Clusters ausgeführt wird, rufen Sie in der Google Cloud Console die Kubernetes-Cluster und suchen Sie nach dem Ladesymbol neben der den Clusternamen der Umgebung.

Nächste Schritte