KubernetesPodOperator verwenden.

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Auf dieser Seite wird beschrieben, wie Sie mit dem KubernetesPodOperator Kubernetes-Pods von Cloud Composer in die Google Kubernetes Engine Cluster, der zu Ihrer Cloud Composer-Umgebung gehört.

KubernetesPodOperator startet Kubernetes-Pods im Cluster Ihrer Umgebung. Im Vergleich dazu Mit Google Kubernetes Engine-Operatoren werden Kubernetes-Pods in einer bestimmten Cluster, der ein separater Cluster sein kann, der nicht mit Ihrem zu verbessern. Sie können Cluster auch mit Google Kubernetes Engine-Operatoren erstellen und löschen.

KubernetesPodOperator ist eine gute Option, wenn Sie Folgendes benötigen:

  • Benutzerdefinierte Python-Abhängigkeiten, die nicht über das öffentliche PyPI-Repository verfügbar sind.
  • Binäre Abhängigkeiten, die im Cloud Composer-Worker-Image nicht verfügbar sind.

Hinweise

Ressourcen der Cloud Composer-Umgebung einrichten

Wenn Sie eine Cloud Composer-Umgebung erstellen, geben Sie deren Leistungsparameter an, einschließlich der Leistungsparameter für den Cluster der Umgebung. Das Starten von Kubernetes-Pods im Umgebungscluster kann zu Konflikten bei Clusterressourcen wie CPU oder Arbeitsspeicher führen. Da sich der Airflow-Planer und die Airflow-Worker im selben GKE-Cluster befinden, funktionieren Planer und Worker nicht ordnungsgemäß, wenn diese Konkurrenzsituation einen Ressourcenmangel verursacht.

Führen Sie eine oder mehrere der folgenden Aktionen aus, um einen Ressourcenmangel zu verhindern:

Knotenpool erstellen

Der bevorzugte Weg, um einen Ressourcenmangel in in der Cloud Composer-Umgebung Erstellen Sie einen neuen Knotenpool und Kubernetes-Pods so konfigurieren, dass sie nur mit Ressourcen aus diesem Pool.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie auf den Namen Ihrer Umgebung.

  3. Wechseln Sie auf der Seite Umgebungsdetails zum Tab Umgebungskonfiguration.

  4. Gehen Sie zu Ressourcen > GKE-Cluster. klicken Sie auf den Link Clusterdetails ansehen.

  5. Erstellen Sie einen Knotenpool, wie unter Knotenpool hinzufügen beschrieben.

gcloud

  1. Bestimmen Sie den Namen des Umgebungsclusters:

    gcloud composer environments describe ENVIRONMENT_NAME \
      --location LOCATION \
      --format="value(config.gkeCluster)"
    

    Ersetzen Sie:

    • ENVIRONMENT_NAME durch den Namen der Umgebung.
    • LOCATION durch die Region, in der sich die Umgebung befindet.
  2. Die Ausgabe enthält den Namen des Clusters Ihrer Umgebung. Dies kann beispielsweise europe-west3-example-enviro-af810e25-gke sein.

  3. Erstellen Sie einen Knotenpool, wie unter Knotenpool hinzufügen beschrieben.

Anzahl der Knoten in Ihrer Umgebung erhöhen

Wenn Sie die Anzahl der Knoten in der Cloud Composer-Umgebung erhöhen, steht den Arbeitslasten mehr Rechenleistung zur Verfügung. Durch diese Erhöhung werden keine zusätzlichen Ressourcen für Aufgaben bereitgestellt, die mehr CPU oder RAM benötigen, als der angegebene Maschinentyp bietet.

Zum Erhöhen der Knotenanzahl aktualisieren Sie die Umgebung.

Geeigneten Maschinentyp angeben

Wenn Sie die Cloud Composer-Umgebung erstellen, können Sie einen Maschinentyp angeben. Damit immer genug Ressourcen verfügbar sind, sollten Sie einen Maschinentyp für die Art von Computing angeben, das in Ihrer Cloud Composer-Umgebung ausgeführt wird.

Minimalkonfiguration

Zum Erstellen eines KubernetesPodOperator sind nur die Pod-Parameter name, image und task_id erforderlich. Das /home/airflow/composer_kube_config enthält Anmeldedaten zur Authentifizierung bei GKE.

Airflow 2

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. In Composer 1 there is the potential for
    # the resource starvation of Airflow workers and scheduler
    # within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources,
    # and using Composer 2 will mean the environment will autoscale.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Airflow 1

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Pod-Affinitätskonfiguration

Mit der Konfiguration des Parameters affinity in KubernetesPodOperator können Sie steuern, auf welchen Knoten Pods geplant werden sollen, beispielsweise nur auf Knoten in einem bestimmten Knotenpool. In diesem Beispiel wird der Operator nur in Knotenpools mit den Namen pool-0 und pool-1. Da sich Ihre Cloud Composer 1-Umgebungsknoten im default-pool befinden, werden Ihre Pods nicht auf den Knoten in Ihrer Umgebung ausgeführt.

Airflow 2

# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Airflow 1

kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Da das Beispiel so konfiguriert ist, schlägt die Aufgabe fehl. Aus den Logs geht hervor, dass die Aufgabe fehlschlägt, weil die Knotenpools pool-0 und pool-1 nicht vorhanden sind.

Damit die Knotenpools in values zu finden sind, nehmen Sie eine der folgenden Konfigurationsänderungen vor:

  • Wenn Sie einen Knotenpool zuvor erstellt haben, ersetzen Sie pool-0 und pool-1 durch die Namen Ihrer Knotenpools. Laden Sie dann Ihren DAG noch einmal hoch.

  • Erstellen Sie einen Knotenpool namens pool-0 oder pool-1. Sie können beide erstellen, zur erfolgreichen Ausführung der Aufgabe ist aber nur einer erforderlich.

  • Ersetzen Sie pool-0 und pool-1 durch default-pool. Dies ist der von Airflow verwendete Standardpool. Laden Sie dann Ihren DAG noch einmal hoch.

Warten Sie nach den Änderungen einige Minuten, bis die Umgebung aktualisiert wurde. Führen Sie dann die Aufgabe ex-pod-affinity noch einmal aus und prüfen Sie, ob die Aufgabe ex-pod-affinity erfolgreich ausgeführt wurde.

Zusätzliche Konfiguration

In diesem Beispiel sind zusätzliche Parameter zu sehen, die Sie im KubernetesPodOperator konfigurieren können.

Weitere Informationen zu Parametern finden Sie in der Airflow-Referenz für KubernetesPodOperator. Informationen zur Verwendung von Kubernetes Secrets und ConfigMaps finden Sie unter Kubernetes Secrets und ConfigMaps verwenden. Informationen zur Verwendung von Jinja-Vorlagen mit KubernetesPodOperator finden Sie unter Jinja-Vorlagen verwenden.

Airflow 2

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        limits={"memory": "250M", "cpu": "100m"},
    ),
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Airflow 1

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={"limit_memory": "250M", "limit_cpu": "100m"},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={},
)

Jinja-Vorlagen verwenden

Airflow unterstützt Jinja-Vorlagen in DAGs.

Sie müssen die erforderlichen Airflow-Parameter (task_id, name und image) mit dem Operator deklarieren. Wie im folgenden Beispiel gezeigt, können Sie alle anderen Parameter mit Jinja als Vorlage verwenden, einschließlich cmds, arguments, env_vars und config_file.

Der Parameter env_vars in diesem Beispiel wird von einem Airflow-Variable mit dem Namen my_value. Beispiel-DAG den Wert aus der Vorlagenvariable vars in Airflow erhält. Airflow bietet mehr Variablen, die Zugriff auf verschiedene Arten von Informationen ermöglichen. So können Sie beispielsweise mit der Vorlagenvariablen conf auf Werte der Airflow-Konfigurationsoptionen zugreifen. Weitere Informationen und eine Liste der in Airflow verfügbaren Variablen finden Sie in der Airflow-Dokumentation unter Referenz zu Vorlagen.

Ohne den DAG zu ändern oder die Variable env_vars zu erstellen, schlägt die Aufgabe ex-kube-templates im Beispiel fehl, da die Variable nicht vorhanden ist. Erstellen Sie diese Variable in der Airflow-Benutzeroberfläche oder mit der Google Cloud CLI:

Airflow-UI

  1. Rufen Sie die Airflow-UI auf.

  2. Klicken Sie in der Symbolleiste auf Verwaltung > Variablen.

  3. Klicken Sie auf der Seite Listenvariable auf Neuen Eintrag hinzufügen.

  4. Geben Sie auf der Seite Add Variable (Variable hinzufügen) die folgenden Informationen ein:

    • Key: my_value
    • Val: example_value
  5. Klicken Sie auf Speichern.

Wenn in Ihrer Umgebung Airflow 1 verwendet wird, führen Sie stattdessen den folgenden Befehl aus:

  1. Rufen Sie die Airflow-UI auf.

  2. Klicken Sie in der Symbolleiste auf Admin > Variables.

  3. Klicken Sie auf der Seite Variablen auf den Tab Erstellen.

  4. Geben Sie auf der Seite Variable die folgenden Informationen ein:

    • Key: my_value
    • Val: example_value
  5. Klicken Sie auf Speichern.

gcloud

Geben Sie den folgenden Befehl ein:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Wenn Ihre Umgebung Airflow 1 verwendet, führen Sie stattdessen den folgenden Befehl aus:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set my_value example_value

Ersetzen Sie:

  • ENVIRONMENT durch den Namen der Umgebung.
  • LOCATION durch die Region, in der sich die Umgebung befindet.

Das folgende Beispiel zeigt, wie Jinja-Vorlagen mit KubernetesPodOperator:

Airflow 2

kubenetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Airflow 1

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Kubernetes-Secrets und -ConfigMaps verwenden

Ein Kubernetes-Secret ist ein Objekt, das sensible Daten enthält. Ein Kubernetes ConfigMap ist ein Objekt, das Folgendes enthält: nicht vertrauliche Daten in Schlüssel/Wert-Paaren.

In Cloud Composer 2 können Sie Secrets und ConfigMaps mit der Google Cloud CLI, der API oder Terraform erstellen und dann über KubernetesPodOperator darauf zugreifen.

YAML-Konfigurationsdateien

Wenn Sie ein Kubernetes-Secret oder eine ConfigMap mithilfe der Google Cloud CLI und API eine Datei im YAML-Format bereitstellen. Diese Datei muss den gleichen das von Kubernetes Secrets und ConfigMaps verwendet wird. Die Kubernetes-Dokumentation enthält viele Codebeispiele für ConfigMaps und Secrets. Sehen Sie sich als Erstes die Seite Anmeldedaten mit Secrets sicher verteilen und ConfigMaps an.

Wie bei Kubernetes-Secrets sollten Sie die Base64-Darstellung verwenden, wenn Sie Werte in Secrets definieren.

Um einen Wert zu codieren, können Sie den folgenden Befehl verwenden. Dies ist eine von vielen Möglichkeiten, um einen base64-codierten Wert zu erhalten):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

Ausgabe:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Die folgenden beiden YAML-Dateibeispiele werden später in diesem Leitfaden in Beispielen verwendet. Beispiel für eine YAML-Konfigurationsdatei für ein Kubernetes-Secret:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Ein weiteres Beispiel, das zeigt, wie Dateien eingebunden werden können. Wie im vorherigen Beispiel: Codieren Sie zuerst den Inhalt einer Datei (cat ./key.json | base64) und geben Sie diesen Wert dann in der YAML-Datei an:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

Beispiel für eine YAML-Konfigurationsdatei für eine ConfigMap. Sie müssen die Base64-Darstellung in ConfigMaps nicht verwenden:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Kubernetes-Secrets verwalten

In Cloud Composer 2 erstellen Sie Secrets mit der Google Cloud CLI und kubectl:

  1. Rufen Sie Informationen zum Cluster Ihrer Umgebung ab:

    1. Führen Sie dazu diesen Befehl aus:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      Ersetzen Sie:

      • ENVIRONMENT durch den Namen Ihrer Umgebung.
      • LOCATION durch die Region, in der sich die Cloud Composer-Umgebung befindet.

      Die Ausgabe dieses Befehls hat das folgende Format: projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>.

    2. Zum Abrufen der GKE-Cluster-ID kopieren Sie die Ausgabe nach /clusters/ (endet auf -gke).

    3. Zum Abrufen der Zone kopieren Sie die Ausgabe nach /zones/.

  2. Stellen Sie mit folgendem Befehl eine Verbindung zum GKE-Cluster her:

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --zone ZONE
    

    Ersetzen Sie:

    • CLUSTER_ID: die Cluster-ID der Umgebung.
    • PROJECT_ID: die Projekt-ID.
    • ZONE durch die Zone, in der sich der Cluster der Umgebung befindet.
  3. Erstellen Sie Kubernetes-Secrets:

    Die folgenden Befehle veranschaulichen zwei unterschiedliche Ansätze zum Erstellen von Kubernetes-Secrets. Beim --from-literal-Ansatz werden Schlüssel/Wert-Paare verwendet. Der Ansatz --from-file verwendet Dateiinhalte.

    • Führen Sie den folgenden Befehl aus, um ein Kubernetes-Secret mithilfe von Schlüssel/Wert-Paaren zu erstellen. In diesem Beispiel wird ein Secret mit dem Namen airflow-secrets erstellt, das ein sql_alchemy_conn-Feld mit dem Wert test_value enthält.

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value
      
    • Führen Sie den folgenden Befehl aus, um ein Kubernetes-Secret durch Angabe des Dateiinhalts zu erstellen. In diesem Beispiel wird ein Secret namens service-account mit dem Feld service-account.json und dem -Wert, der aus dem Inhalt einer lokalen ./key.json-Datei entnommen wurde.

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json
      

Kubernetes-Secrets in DAGs verwenden

In diesem Beispiel werden zwei Möglichkeiten zur Verwendung von Kubernetes Secrets gezeigt: als Umgebung und als vom Pod bereitgestelltes Volume.

Das erste Secret, airflow-secrets, wurde festgelegt in eine Kubernetes-Umgebungsvariable namens SQL_CONN (im Gegensatz zu Airflow- oder Cloud Composer-Umgebungsvariable).

Das zweite Secret, service-account, stellt service-account.json, eine Datei mit einem Dienstkontotoken, in /var/secrets/google bereit.

So sehen die Secret-Objekte aus:

Airflow 2

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Airflow 1

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Der Name des ersten Kubernetes-Secrets wird in der Variablen secret_env definiert. Dieses Secret heißt airflow-secrets. Der Parameter deploy_type gibt an, dass er als Umgebungsvariable verfügbar gemacht werden muss. Der Wert der Umgebungsvariablen Der Name lautet SQL_CONN, wie im Parameter deploy_target angegeben. Schließlich wird der Wert der Umgebungsvariablen SQL_CONN auf den Wert des Schlüssels sql_alchemy_conn festgelegt.

Der Name des zweiten Kubernetes-Secrets wird in secret_volume definiert . Dieses Secret heißt service-account. Sie wird als Lautstärke, wie im Parameter deploy_type angegeben. Der Pfad der Datei zu Bereitstellung (deploy_target) ist /var/secrets/google. Schließlich ist der key der Das im deploy_target gespeicherte Secret ist service-account.json.

Die Operatorkonfiguration sieht so aus:

Airflow 2

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Airflow 1

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Informationen zum CNCF-Kubernetes-Anbieter

KubernetesPodOperator wird in apache-airflow-providers-cncf-kubernetes-Anbieter.

Ausführliche Versionshinweise für CNCF-Kubernetes-Anbieter finden Sie auf der Website des CNCF-Kubernetes-Anbieters.

Version 6.0.0

In Version 6.0.0 des CNCF-Kubernetes-Anbieterpakets Die Verbindung kubernetes_default wird im KubernetesPodOperator standardmäßig verwendet.

Wenn Sie in Version 5.0.0 eine benutzerdefinierte Verbindung angegeben haben, wird diese vom Betreiber weiterhin verwendet. Wenn Sie wieder zur kubernetes_default-Verbindung zurückkehren möchten, sollten Sie Ihre DAGs entsprechend anpassen.

Version 5.0.0

Mit dieser Version werden einige abwärtsinkompatible Änderungen eingeführt. im Vergleich zu Version 4.4.0. Die wichtigsten betreffen die kubernetes_default-Verbindung, die in Version 5.0.0 nicht verwendet wird.

  • Die kubernetes_default-Verbindung muss geändert werden. Kubernetes-Konfiguration Pfad muss auf /home/airflow/composer_kube_config festgelegt sein (wie in der folgenden Abbildung dargestellt). Alternativ muss config_file der KubernetesPodOperator-Konfiguration hinzugefügt werden (siehe folgendes Codebeispiel).
<ph type="x-smartling-placeholder">
</ph> Feld „Kube-Konfigurationspfad“ in der Airflow-Benutzeroberfläche
Abbildung 1: Airflow-Benutzeroberfläche, kubernetes_default-Verbindung ändern (zum Vergrößern klicken)
  • Ändern Sie den Code einer Aufgabe mit KubernetesPodOperator so:
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

Weitere Informationen zu Version 5.0.0 finden Sie unter Versionshinweise für CNCF-Kubernetes-Anbieter

Fehlerbehebung

In diesem Abschnitt finden Sie Tipps zur Behebung häufiger Probleme mit KubernetesPodOperator:

Logs ansehen

Bei der Fehlerbehebung können Sie Protokolle in der folgenden Reihenfolge überprüfen:

  1. Airflow-Task-Logs:

    1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

      Zur Seite Umgebungen

    2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

    3. Rufen Sie den Tab DAGs auf.

    4. Klicken Sie auf den Namen des DAG und dann auf die DAG-Ausführung, um die Details und Protokolle aufzurufen.

  2. Airflow-Planerlogs:

    1. Rufen Sie die Seite Umgebungsdetails auf.

    2. Rufen Sie den Tab Logs auf.

    3. Prüfen Sie die Logs des Airflow-Planers.

  3. Pod-Logs in der Google Cloud Console unter „GKE-Arbeitslasten“. Diese Logs enthalten die YAML-Datei für die Pod-Definition, Pod-Ereignisse und Pod-Details.

Rückgabecodes ungleich null

Wenn Sie den KubernetesPodOperator (und GKEStartPodOperator) verwenden, gibt den Einstiegspunkt des Containers an, ob die Aufgabe ob sie erfolgreich sind oder nicht. Rückgabecodes mit einem Wert ungleich null weisen auf einen Fehler hin.

Ein gängiges Muster besteht darin, ein Shell-Script als Container-Einstiegspunkt auszuführen, um mehrere Vorgänge innerhalb des Containers zusammenzufassen.

Wenn Sie ein solches Skript schreiben, sollten Sie den Befehl set -e oben im Skript einfügen, sodass fehlgeschlagene Befehle im Skript das Skript beenden und den Fehler an die Airflow-Aufgabeninstanz weiterleiten.

Pod-Zeitüberschreitungen

Das Standardzeitlimit für KubernetesPodOperator beträgt 120 Sekunden. Dies kann zu Zeitüberschreitungen führen, bevor größere Images heruntergeladen sind. Sie können das Zeitlimit erhöhen, indem Sie den Parameter startup_timeout_seconds ändern, wenn erstellen Sie den KubernetesPodOperator.

Wenn eine Pod-Zeitüberschreitung auftritt, ist das aufgabenspezifische Log in der Airflow-UI verfügbar. Beispiel:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Pod-Zeitüberschreitungen können auch auftreten, wenn das Cloud Composer-Dienstkonto nicht die erforderlichen IAM-Berechtigungen zum Ausführen der jeweiligen Aufgabe hat. Überprüfen Sie dies, indem Sie sich Fehler auf Pod-Ebene mithilfe der Methode GKE-Dashboards zum Ansehen der Logs für Ihre oder Cloud Logging verwenden.

Neue Verbindung konnte nicht hergestellt werden

Automatische Upgrades sind in GKE-Clustern standardmäßig aktiviert. Wenn sich ein Knotenpool in einem Cluster befindet, für den gerade ein Upgrade durchgeführt wird, sehen Sie möglicherweise die folgende Fehlermeldung:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Wenn Sie prüfen möchten, ob ein Upgrade des Clusters ausgeführt wird, rufen Sie in der Google Cloud Console die Kubernetes-Cluster und suchen Sie nach dem Ladesymbol neben der den Clusternamen der Umgebung.

Nächste Schritte