Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Auf dieser Seite wird beschrieben, wie Sie mit KubernetesPodOperator
Kubernetes-Pods von Cloud Composer im Google Kubernetes Engine-Cluster bereitstellen, der Teil Ihrer Cloud Composer-Umgebung ist. Außerdem erfahren Sie, wie Sie Ihrer Umgebung geeignete Ressourcen zur Verfügung stellen.
KubernetesPodOperator
startet Kubernetes-Pods im Cluster Ihrer Umgebung. Im Vergleich dazu
Mit Google Kubernetes Engine-Operatoren werden Kubernetes-Pods in einem bestimmten
Cluster, der ein separater Cluster sein kann, der nicht mit Ihrem
zu verbessern. Sie können Cluster auch mit Google Kubernetes Engine-Operatoren erstellen und löschen.
KubernetesPodOperator
ist eine gute Option, wenn Sie Folgendes benötigen:
- Benutzerdefinierte Python-Abhängigkeiten, die nicht über das öffentliche PyPI-Repository verfügbar sind.
- Binäre Abhängigkeiten, die im Cloud Composer-Worker-Image nicht verfügbar sind.
Auf dieser Seite werden Sie Schritt für Schritt durch einen Beispiel-Airflow-DAG mit den folgenden KubernetesPodOperator
-Konfigurationen geführt:
- Minimalkonfiguration: Legt nur die erforderlichen Parameter fest.
- Vorlagenkonfiguration: Verwendet Parameter, die Sie für Vorlagen mit Jinja verwenden können.
Secret-Variablenkonfiguration: Übergibt ein Kubernetes-Secret-Objekt an den Pod.
In Cloud Composer 3 ist die Pod-Affinitätskonfiguration nicht verfügbar. Verwenden Sie stattdessen die GKE-Operatoren, um Pods in einem anderen Cluster zu starten.
Vollständige Konfiguration: Umfasst alle Konfigurationen.
Hinweise
In Cloud Composer 3 der Cluster Ihrer Umgebung automatisch skaliert wird. Zusätzliche Arbeitslasten, die Sie mit
KubernetesPodOperator
ausführen, werden unabhängig von Ihrer Umgebung skaliert. Ihre Umgebung ist nicht von der erhöhten Ressourcennachfrage betroffen, aber der Cluster Ihrer Umgebung wird je nach Ressourcenbedarf skaliert. Die Preise für die zusätzlichen Arbeitslasten, die Sie in Ihrem und der Cluster der Umgebung Cloud Composer 3-Preismodell und verwendet Cloud Composer 3-SKUs.In Cloud Composer 3 befindet sich der Cluster Ihrer Umgebung im Mandantenprojekt. KubernetesPodOperator funktioniert jedoch ohne dass im Vergleich zu Cloud Composer 2 Codeänderungen erforderlich sind. Pods werden im Cluster der Umgebung in einem isolierten Namespace ausgeführt, aber mit Zugriff auf Ihr VPC-Netzwerk (sofern aktiviert).
Cloud Composer 3 verwendet GKE-Cluster mit Identitätsföderation von Arbeitslasten für GKE Standardmäßig werden Pods die in einem neu erstellten Namespace oder im
composer-user-workloads
ausgeführt werden Namespace kann nicht auf Google Cloud-Ressourcen zugreifen. Bei Verwendung Identitätsföderation von Arbeitslasten für GKE, Kubernetes-Dienstkonten, die verknüpft sind Namespaces müssen Google Cloud-Dienstkonten zugeordnet werden, Aktivieren der Dienstidentitätsautorisierung für Anfragen an Google APIs und andere Dienstleistungen.Wenn Sie daher Pods im Namespace
composer-user-workloads
ausführen, oder einem neu erstellten Namespace im Cluster Ihrer Umgebung, IAM-Bindungen zwischen Kubernetes und Google Cloud Dienstkonten werden nicht erstellt und diese Pods können nicht auf Ressourcen zugreifen Ihres Google Cloud-Projekts.Wenn Ihre Pods Zugriff auf Google Cloud-Ressourcen haben sollen, Verwenden Sie dann den Namespace
composer-user-workloads
oder erstellen Sie einen eigenen -Namespace, wie weiter beschrieben.Um Zugriff auf die Ressourcen Ihres Projekts zu gewähren, Folgen Sie der Anleitung in der Identitätsföderation von Arbeitslasten für GKE und richte die Bindungen ein:
- Erstellen Sie einen separaten Namespace im Cluster Ihrer Umgebung.
- Erstellen Sie eine Bindung zwischen dem
composer-user-workloads/<namespace_name>
Kubernetes-Dienstkonto und dem Dienstkonto Ihrer Umgebung. - Fügen Sie dem Kubernetes-Dienstkonto die Dienstkontoannotation Ihrer Umgebung hinzu.
- Wenn Sie
KubernetesPodOperator
verwenden, geben Sie den Namespace und das Kubernetes-Dienstkonto in den Parameternnamespace
undservice_account_name
an.
Cloud Composer 3 verwendet GKE-Cluster mit Arbeitslast Identität. Der GKE-Metadatenserver benötigt einige Sekunden, Anfragen auf einem neu erstellten Pod anzunehmen. Daher wird versucht, mit Workload Identity innerhalb der ersten Sekunden Die Lebensdauer des Pods kann fehlschlagen. Weitere Informationen zu dieser Einschränkung finden Sie unter Einschränkungen von Workload Identity:
Cloud Composer 3 verwendet Autopilot-Cluster, die den Begriff Compute-Klassen einführen:
Wenn kein Kurs ausgewählt ist, gilt standardmäßig die Klasse
general-purpose
wird angenommen, wenn Sie Pods mitKubernetesPodOperator
erstellen.Jede Klasse ist mit bestimmten Eigenschaften und Ressourcenlimits verknüpft. Weitere Informationen finden Sie in der Autopilot-Dokumentation. Beispielsweise können Pods, die in der Klasse
general-purpose
ausgeführt werden, Folgendes verwenden: bis zu 110 GiB Arbeitsspeicher.
KubernetesPodOperator-Konfiguration
Zum Ausführen dieses Beispiels legen Sie die gesamte kubernetes_pod_operator.py
-Datei in den dags/
-Ordner Ihrer Umgebung. Alternativ können Sie den entsprechenden KubernetesPodOperator
-Code einem DAG hinzufügen.
In den folgenden Abschnitten wird jede KubernetesPodOperator
-Konfiguration im Beispiel erläutert. Informationen zu den einzelnen Konfigurationsvariablen finden Sie in der Airflow-Referenz.
Minimalkonfiguration
Zum Erstellen eines KubernetesPodOperator
muss nur name
, namespace
des Pods,
den Pod ausführen. image
ist erforderlich und task_id
sind erforderlich.
Wenn Sie das folgende Code-Snippet in eine DAG einfügen, verwendet die Konfiguration die Standardwerte in /home/airflow/composer_kube_config
. Sie müssen den Code nicht ändern, damit die Aufgabe pod-ex-minimum
erfolgreich ausgeführt wird.
Vorlagenkonfiguration
Airflow unterstützt die Verwendung von Jinja-Vorlagen.
Sie müssen die erforderlichen Variablen (task_id
, name
, namespace
, image
) mit dem Operator deklarieren. Wie im folgenden Beispiel gezeigt, können Sie alle anderen Parameter mit Jinja als Vorlage verwenden, einschließlich cmds
, arguments
, env_vars
und config_file
.
Wenn der DAG oder die Umgebung nicht geändert wird, schlägt die Aufgabe ex-kube-templates
aufgrund von zwei Fehlern fehl. Die Logs zeigen, dass diese Aufgabe fehlschlägt, weil die entsprechende Variable (my_value
) fehlt. Der zweite Fehler, den Sie nach dem Beheben des ersten Fehlers erhalten können, zeigt, dass die Aufgabe fehlschlägt, weil core/kube_config
nicht in config
gefunden wird.
Gehen Sie wie unten beschrieben vor, um beide Fehler zu beheben.
So legen Sie my_value
mit gcloud
oder der Airflow-UI fest:
Airflow-UI
In der Airflow 2-UI:
Rufen Sie die Airflow-UI auf.
Klicken Sie in der Symbolleiste auf Admin > Variables.
Klicken Sie auf der Seite Listenvariable auf Neuen Eintrag hinzufügen.
Geben Sie auf der Seite Add Variable (Variable hinzufügen) die folgenden Informationen ein:
- Key:
my_value
- Val:
example_value
- Key:
Klicken Sie auf Speichern.
gcloud
Geben Sie für Airflow 2 den folgenden Befehl ein:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Ersetzen Sie:
ENVIRONMENT
durch den Namen der Umgebung.LOCATION
durch die Region, in der sich die Umgebung befindet.
Wenn Sie auf eine benutzerdefinierte config_file
(Kubernetes-Konfigurationsdatei) verweisen möchten, überschreiben Sie die Airflow-Konfigurationsoption kube_config
durch eine gültige Kubernetes-Konfiguration:
Bereich | Schlüssel | Wert |
---|---|---|
core |
kube_config |
/home/airflow/composer_kube_config |
Warten Sie einige Minuten, bis die Umgebung aktualisiert wurde. Führen Sie dann die Aufgabe ex-kube-templates
noch einmal aus und prüfen Sie, ob die Aufgabe ex-kube-templates
erfolgreich ausgeführt wurde.
Vollständige Konfiguration
In diesem Beispiel werden alle Variablen gezeigt, die Sie in KubernetesPodOperator
konfigurieren können. Sie müssen den Code nicht ändern, damit die Aufgabe ex-all-configs
erfolgreich ist.
Weitere Informationen zu den einzelnen Variablen finden Sie in der KubernetesPodOperator
-Referenz von Airflow.
Informationen zum CNCF-Kubernetes-Anbieter
GKEStartPodOperator und KubernetesPodOperator sind in
apache-airflow-providers-cncf-kubernetes
-Anbieter.
Fehlerhafte Versionshinweise für den CNCF-Kubernetes-Anbieter finden Sie auf der Website des CNCF-Kubernetes-Anbieters.
Version 6.0.0
In Version 6.0.0 des CNCF Kubernetes-Anbieterpakets wird die kubernetes_default
-Verbindung standardmäßig in der KubernetesPodOperator
verwendet.
Wenn Sie in Version 5.0.0 eine benutzerdefinierte Verbindung angegeben haben, wird diese vom Betreiber weiterhin verwendet. So wechselst du zurück zur kubernetes_default
Verbindung herstellen, möchten Sie Ihre DAGs möglicherweise entsprechend anpassen.
Version 5.0.0
Mit dieser Version werden einige abwärtsinkompatible Änderungen eingeführt.
im Vergleich zu Version 4.4.0. Die wichtigsten beziehen sich auf
Die kubernetes_default
-Verbindung, die in Version 5.0.0 nicht verwendet wird.
- Die
kubernetes_default
-Verbindung muss geändert werden. Kube-Konfigurationspfad muss auf/home/airflow/composer_kube_config
festgelegt sein (siehe Abbildung 1). Alternativ mussconfig_file
derKubernetesPodOperator
-Konfiguration hinzugefügt werden (wie im folgenden Code gezeigt). )
- So ändern Sie den Code einer Aufgabe mit KubernetesPodOperator:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Weitere Informationen zu Version 5.0.0 finden Sie in den Versionshinweisen für CNCF-Kubernetes-Anbieter.
Fehlerbehebung
Tipps zur Behebung von Pod-Fehlern
Prüfen Sie neben den Aufgabenlogs in der Airflow-UI auch die folgenden Logs:
Ausgabe des Airflow-Planers und der Airflow-Worker:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Folgen Sie dem Link zu DAGs für Ihre Umgebung.
Gehen Sie in dem Bucket Ihrer Umgebung eine Ebene höher.
Prüfen Sie die Logs im Ordner
logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>
.
Detaillierte Pod-Logs in der Google Cloud Console unter GKE-Arbeitslasten. Diese Logs enthalten die YAML-Datei der Pod-Definition, Pod-Ereignisse und Pod-Informationen.
Rückgabewerte ungleich null, wenn auch GKEStartPodOperator
verwendet wird
Wenn Sie KubernetesPodOperator
und GKEStartPodOperator
verwenden, bestimmt der Rückgabecode des Einstiegspunkts des Containers, ob die Aufgabe als erfolgreich betrachtet wird oder nicht. Rückgabecodes mit einem Wert ungleich null weisen auf einen Fehler hin.
Ein gängiges Muster bei Verwendung von KubernetesPodOperator
und
GKEStartPodOperator
ist das Ausführen eines Shell-Skripts als Container
Einstiegspunkt zum Gruppieren mehrerer Vorgänge innerhalb des Containers.
Wenn Sie ein solches Skript schreiben, sollten Sie den Befehl set -e
oben im Skript einfügen, sodass fehlgeschlagene Befehle im Skript das Skript beenden und den Fehler an die Airflow-Aufgabeninstanz weiterleiten.
Pod-Zeitüberschreitungen
Das Standardzeitlimit für KubernetesPodOperator
beträgt 120 Sekunden. Dies kann zu Zeitüberschreitungen führen, bevor größere Images heruntergeladen sind. Sie können das Zeitlimit erhöhen, wenn Sie beim Erstellen von KubernetesPodOperator
den Parameter startup_timeout_seconds
ändern.
Wenn eine Pod-Zeitüberschreitung auftritt, ist das aufgabenspezifische Log in der Airflow-UI verfügbar. Beispiel:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Pod-Zeitüberschreitungen können auch auftreten, wenn das Cloud Composer-Dienstkonto nicht die erforderlichen IAM-Berechtigungen zum Ausführen der jeweiligen Aufgabe hat. Sehen Sie sich hierzu die Fehler auf Pod-Ebene GKE-Dashboards zum Ansehen der Logs für Ihre oder Cloud Logging verwenden.
Neue Verbindung konnte nicht hergestellt werden
Automatische Upgrades sind in GKE-Clustern standardmäßig aktiviert. Wenn sich ein Knotenpool in einem Cluster befindet, für den gerade ein Upgrade durchgeführt wird, sehen Sie möglicherweise die folgende Fehlermeldung:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Wenn Sie prüfen möchten, ob ein Upgrade des Clusters ausgeführt wird, rufen Sie in der Google Cloud Console die Kubernetes-Cluster und suchen Sie nach dem Ladesymbol neben der den Clusternamen der Umgebung.