Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Auf dieser Seite wird beschrieben, wie Sie mit KubernetesPodOperator
Kubernetes-Pods aus Cloud Composer im Google Kubernetes Engine-Cluster bereitstellen, der Teil Ihrer Cloud Composer-Umgebung ist, und dafür sorgen, dass Ihre Umgebung über die entsprechenden Ressourcen verfügt.
KubernetesPodOperator
startet Kubernetes-Pods
im Cluster Ihrer Umgebung. Im Vergleich dazu führen Google Kubernetes Engine-Operatoren Kubernetes-Pods in einem bestimmten Cluster aus. Dabei kann es sich um einen separaten Cluster handeln, der nicht mit Ihrer Umgebung in Verbindung steht. Sie können Cluster auch mit Google Kubernetes Engine-Operatoren erstellen und löschen.
KubernetesPodOperator
ist eine gute Option, wenn Sie Folgendes benötigen:
- Benutzerdefinierte Python-Abhängigkeiten, die nicht über das öffentliche PyPI-Repository verfügbar sind.
- Binäre Abhängigkeiten, die im Cloud Composer-Worker-Image nicht verfügbar sind.
Auf dieser Seite wird ein Beispiel für einen Airflow-DAG mit den folgenden KubernetesPodOperator
-Konfigurationen beschrieben:
- Minimalkonfiguration: Legt nur die erforderlichen Parameter fest.
- Vorlagenkonfiguration: Verwendet Parameter, die Sie für Vorlagen mit Jinja verwenden können.
Secret-Variablenkonfiguration: Übergibt ein Kubernetes-Secret-Objekt an den Pod.
In Cloud Composer 3 ist die Pod-Affinitätskonfiguration nicht verfügbar. Verwenden Sie stattdessen die GKE-Operatoren, um Pods in einem anderen Cluster zu starten.
Vollständige Konfiguration: Umfasst alle Konfigurationen.
Hinweise
In Cloud Composer 3 wird der Cluster Ihrer Umgebung automatisch skaliert. Zusätzliche Arbeitslasten, die Sie mit
KubernetesPodOperator
ausführen, werden unabhängig von Ihrer Umgebung skaliert. Ihre Umgebung ist nicht von der erhöhten Ressourcennachfrage betroffen, aber der Cluster Ihrer Umgebung wird je nach Ressourcenbedarf skaliert. Die Preise für zusätzliche Arbeitslasten, die Sie im Cluster Ihrer Umgebung ausführen, richten sich nach dem Cloud Composer 2-Preismodell und verwenden Cloud Composer Compute SKUs.In Cloud Composer 3 befindet sich der Cluster der Umgebung im Mandantenprojekt. KubernetesPodOperator funktioniert jedoch auf die gleiche Weise, ohne dass im Vergleich zu Cloud Composer 2 Codeänderungen erforderlich sind. Pods werden im Cluster der Umgebung in einem isolierten Namespace ausgeführt, aber mit Zugriff auf Ihr VPC-Netzwerk (falls aktiviert).
Cloud Composer 3 verwendet GKE-Cluster mit Workload Identity. Standardmäßig können Pods, die in neu erstellten Namespaces oder dem Namespace
composer-user-workloads
ausgeführt werden, nicht auf Google Cloud-Ressourcen zugreifen. Bei Verwendung von Workload Identity müssen Kubernetes-Dienstkonten, die mit Namespaces verknüpft sind, Google Cloud-Dienstkonten zugeordnet werden, um die Dienstidentitätsautorisierung für Anfragen an Google APIs und andere Dienste zu aktivieren.Wenn Sie Pods im Namespace
composer-user-workloads
oder in einem neu erstellten Namespace im Cluster Ihrer Umgebung ausführen, werden daher keine ordnungsgemäßen IAM-Bindungen zwischen Kubernetes- und Google Cloud-Dienstkonten erstellt und diese Pods können nicht auf Ressourcen Ihres Google Cloud-Projekts zugreifen.Wenn Ihre Pods Zugriff auf Google Cloud-Ressourcen haben sollen, verwenden Sie den Namespace
composer-user-workloads
oder erstellen Sie wie weiter beschrieben einen eigenen Namespace.Um Zugriff auf die Ressourcen Ihres Projekts zu gewähren, folgen Sie der Anleitung in Workload Identity und richten Sie die Bindungen ein:
- Erstellen Sie einen separaten Namespace im Cluster Ihrer Umgebung.
- Erstellen Sie eine Bindung zwischen dem Kubernetes-Dienstkonto
composer-user-workloads/<namespace_name>
und dem Dienstkonto Ihrer Umgebung. - Fügen Sie dem Kubernetes-Dienstkonto die Dienstkontoannotation Ihrer Umgebung hinzu.
- Wenn Sie
KubernetesPodOperator
verwenden, geben Sie den Namespace und das Kubernetes-Dienstkonto in den Parameternnamespace
undservice_account_name
an.
Cloud Composer 3 verwendet GKE-Cluster mit Workload Identity. Es dauert einige Sekunden, bis der GKE-Metadatenserver Anfragen für einen neu erstellten Pod akzeptiert. Daher können Versuche, sich innerhalb der ersten Sekunden der Lebensdauer eines Pods mit Workload Identity zu authentifizieren, fehlschlagen. Weitere Informationen zu dieser Einschränkung finden Sie unter Einschränkungen von Workload Identity.
Cloud Composer 2 verwendet Autopilot-Cluster, die das Konzept von Compute-Klassen einführen:
Wenn keine Klasse ausgewählt ist, wird beim Erstellen von Pods mit
KubernetesPodOperator
standardmäßig die Klassegeneral-purpose
angenommen.Jede Klasse ist mit bestimmten Attributen und Ressourcenlimits verknüpft. Informationen dazu finden Sie in der Autopilot-Dokumentation. Pods, die innerhalb der Klasse
general-purpose
ausgeführt werden, können beispielsweise bis zu 110 GiB Arbeitsspeicher nutzen.
KubernetesPodOperator-Konfiguration
Zum Ausführen dieses Beispiels legen Sie die gesamte kubernetes_pod_operator.py
-Datei in den dags/
-Ordner Ihrer Umgebung. Alternativ können Sie den entsprechenden KubernetesPodOperator
-Code einem DAG hinzufügen.
In den folgenden Abschnitten wird jede KubernetesPodOperator
-Konfiguration im Beispiel erläutert. Informationen zu den einzelnen Konfigurationsvariablen finden Sie in der Airflow-Referenz.
Minimalkonfiguration
Zum Erstellen eines KubernetesPodOperator
sind nur name
des Pods, namespace
, wo der Pod ausgeführt werden soll, image
und task_id
erforderlich.
Wenn Sie das folgende Code-Snippet in eine DAG einfügen, verwendet die Konfiguration die Standardwerte in /home/airflow/composer_kube_config
. Sie müssen den Code nicht ändern, damit die Aufgabe pod-ex-minimum
erfolgreich ausgeführt wird.
Vorlagenkonfiguration
Airflow unterstützt die Verwendung von Jinja-Vorlagen.
Sie müssen die erforderlichen Variablen (task_id
, name
, namespace
, image
) mit dem Operator deklarieren. Wie im folgenden Beispiel gezeigt, können Sie alle anderen Parameter mit Jinja als Vorlage verwenden, einschließlich cmds
, arguments
, env_vars
und config_file
.
Wenn der DAG oder die Umgebung nicht geändert wird, schlägt die Aufgabe ex-kube-templates
aufgrund von zwei Fehlern fehl. Die Logs zeigen, dass diese Aufgabe fehlschlägt, weil die entsprechende Variable (my_value
) fehlt. Der zweite Fehler, den Sie nach dem Beheben des ersten Fehlers erhalten können, zeigt, dass die Aufgabe fehlschlägt, weil core/kube_config
nicht in config
gefunden wird.
Gehen Sie wie unten beschrieben vor, um beide Fehler zu beheben.
So legen Sie my_value
mit gcloud
oder der Airflow-UI fest:
Airflow-UI
In der Airflow 2-UI:
Rufen Sie die Airflow-UI auf.
Klicken Sie in der Symbolleiste auf Admin > Variables.
Klicken Sie auf der Seite Listenvariable auf Neuen Eintrag hinzufügen.
Geben Sie auf der Seite Add Variable (Variable hinzufügen) die folgenden Informationen ein:
- Key:
my_value
- Val:
example_value
- Key:
Klicken Sie auf Speichern.
gcloud
Geben Sie für Airflow 2 den folgenden Befehl ein:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Ersetzen Sie:
ENVIRONMENT
durch den Namen der Umgebung.LOCATION
durch die Region, in der sich die Umgebung befindet.
Wenn Sie auf eine benutzerdefinierte config_file
(Kubernetes-Konfigurationsdatei) verweisen möchten, überschreiben Sie die Airflow-Konfigurationsoption kube_config
durch eine gültige Kubernetes-Konfiguration:
Bereich | Schlüssel | Wert |
---|---|---|
core |
kube_config |
/home/airflow/composer_kube_config |
Warten Sie einige Minuten, bis die Umgebung aktualisiert wurde. Führen Sie dann die Aufgabe ex-kube-templates
noch einmal aus und prüfen Sie, ob die Aufgabe ex-kube-templates
erfolgreich ausgeführt wurde.
Vollständige Konfiguration
In diesem Beispiel werden alle Variablen gezeigt, die Sie in KubernetesPodOperator
konfigurieren können. Sie müssen den Code nicht ändern, damit die Aufgabe ex-all-configs
erfolgreich ist.
Weitere Informationen zu den einzelnen Variablen finden Sie in der KubernetesPodOperator
-Referenz von Airflow.
Informationen zum CNCF-Kubernetes-Anbieter
GKEStartPodOperator und KubernetesPodOperator werden vom Anbieter apache-airflow-providers-cncf-kubernetes
implementiert.
Fehlerhafte Versionshinweise für den CNCF-Kubernetes-Anbieter finden Sie auf der Website des CNCF-Kubernetes-Anbieters.
Version 6.0.0
In Version 6.0.0 des CNCF-Kubernetes-Anbieterpakets wird die kubernetes_default
-Verbindung standardmäßig in KubernetesPodOperator
verwendet.
Wenn Sie in Version 5.0.0 eine benutzerdefinierte Verbindung angegeben haben, wird diese vom Operator weiterhin verwendet. Wenn Sie wieder zur Verbindung kubernetes_default
zurückwechseln möchten, müssen Sie Ihre DAGs möglicherweise entsprechend anpassen.
Version 5.0.0
Diese Version enthält im Vergleich zu Version 4.4.0 einige abwärtsinkompatible Änderungen. Die wichtigsten beziehen sich auf die Verbindung kubernetes_default
, die in Version 5.0.0 nicht verwendet wird.
- Die
kubernetes_default
-Verbindung muss geändert werden. Der Kube-Konfigurationspfad muss auf/home/airflow/composer_kube_config
festgelegt sein (siehe Abbildung 1). Alternativ mussconfig_file
derKubernetesPodOperator
-Konfiguration hinzugefügt werden (siehe folgendes Codebeispiel).
- So ändern Sie den Code einer Aufgabe mit KubernetesPodOperator:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Weitere Informationen zu Version 5.0.0 finden Sie in den Versionshinweisen für CNCF-Kubernetes-Anbieter.
Fehlerbehebung
Tipps zur Behebung von Pod-Fehlern
Prüfen Sie neben den Aufgabenlogs in der Airflow-UI auch die folgenden Logs:
Ausgabe des Airflow-Planers und der Airflow-Worker:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Folgen Sie dem Link zu DAGs für Ihre Umgebung.
Gehen Sie in dem Bucket Ihrer Umgebung eine Ebene höher.
Prüfen Sie die Logs im Ordner
logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>
.
Detaillierte Pod-Logs in der Google Cloud Console unter GKE-Arbeitslasten. Diese Logs enthalten die YAML-Datei der Pod-Definition, Pod-Ereignisse und Pod-Informationen.
Rückgabewerte ungleich null, wenn auch GKEStartPodOperator
verwendet wird
Wenn Sie KubernetesPodOperator
und GKEStartPodOperator
verwenden, bestimmt der Rückgabecode des Einstiegspunkts des Containers, ob die Aufgabe als erfolgreich betrachtet wird oder nicht. Rückgabecodes mit einem Wert ungleich null weisen auf einen Fehler hin.
Ein gängiges Muster bei der Verwendung von KubernetesPodOperator
und GKEStartPodOperator
besteht darin, ein Shell-Skript als Containereinstiegspunkt auszuführen, um mehrere Vorgänge innerhalb des Containers zu gruppieren.
Wenn Sie ein solches Skript schreiben, sollten Sie den Befehl set -e
oben im Skript einfügen, sodass fehlgeschlagene Befehle im Skript das Skript beenden und den Fehler an die Airflow-Aufgabeninstanz weiterleiten.
Pod-Zeitüberschreitungen
Das Standardzeitlimit für KubernetesPodOperator
beträgt 120 Sekunden. Dies kann zu Zeitüberschreitungen führen, bevor größere Images heruntergeladen sind. Sie können das Zeitlimit erhöhen, wenn Sie beim Erstellen von KubernetesPodOperator
den Parameter startup_timeout_seconds
ändern.
Wenn eine Pod-Zeitüberschreitung auftritt, ist das aufgabenspezifische Log in der Airflow-UI verfügbar. Beispiel:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Pod-Zeitüberschreitungen können auch auftreten, wenn dem Cloud Composer-Dienstkonto die erforderlichen IAM-Berechtigungen zum Ausführen der vorliegenden Aufgabe fehlen. Prüfen Sie dies anhand der GKE-Dashboards auf Pod-Ebene und sehen Sie sich die Logs für Ihre jeweilige Arbeitslast an. Alternativ können Sie auch Cloud Logging verwenden.
Neue Verbindung konnte nicht hergestellt werden
Automatische Upgrades sind in GKE-Clustern standardmäßig aktiviert. Wenn sich ein Knotenpool in einem Cluster befindet, für den gerade ein Upgrade durchgeführt wird, sehen Sie möglicherweise die folgende Fehlermeldung:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Wenn Sie prüfen möchten, ob ein Clusterupgrade ausgeführt wird, rufen Sie in der Google Cloud Console die Seite Kubernetes-Cluster auf und suchen Sie nach dem Ladesymbol neben dem Clusternamen Ihrer Umgebung.