Cloud Composer 1 Cloud Composer 2
Auf dieser Seite wird beschrieben, wie Sie mit KubernetesPodOperator
Kubernetes-Pods von Cloud Composer im Google Kubernetes Engine-Cluster bereitstellen, der Teil Ihrer Cloud Composer-Umgebung ist, und dafür sorgen, dass Ihre Umgebung über die entsprechenden Ressourcen verfügt.
KubernetesPodOperator
startet Kubernetes-Pods
im Cluster Ihrer Umgebung. Im Gegensatz dazu führen Google Kubernetes Engine-Operatoren Kubernetes-Pods in einem bestimmten Cluster aus, der ein separater Cluster sein kann, der nicht mit Ihrer Umgebung in Verbindung steht. Sie können Cluster auch mit Google Kubernetes Engine-Operatoren erstellen und löschen.
KubernetesPodOperator
ist eine gute Option, wenn Sie Folgendes benötigen:
- Benutzerdefinierte Python-Abhängigkeiten, die nicht über das öffentliche PyPI-Repository verfügbar sind.
- Binäre Abhängigkeiten, die im Cloud Composer-Worker-Image nicht verfügbar sind.
Auf dieser Seite werden Sie durch einen Airflow-Beispiel-DAG mit den folgenden KubernetesPodOperator
-Konfigurationen geführt:
- Minimalkonfiguration: Legt nur die erforderlichen Parameter fest.
- Vorlagenkonfiguration: Verwendet Parameter, die Sie für Vorlagen mit Jinja verwenden können.
Secret-Variablenkonfiguration: Übergibt ein Kubernetes-Secret-Objekt an den Pod.
In Cloud Composer 2 ist die Konfiguration der Pod-Affinität nicht verfügbar. Verwenden Sie stattdessen die GKE-Operatoren, um Pods in einem anderen Cluster zu starten.
Vollständige Konfiguration: Umfasst alle Konfigurationen.
Hinweise
In Cloud Composer 2 wird der Cluster Ihrer Umgebung automatisch skaliert. Zusätzliche Arbeitslasten, die Sie mit
KubernetesPodOperator
ausführen, werden unabhängig von Ihrer Umgebung skaliert. Ihre Umgebung ist nicht von der erhöhten Ressourcennachfrage betroffen, aber der Cluster Ihrer Umgebung wird je nach Ressourcenbedarf skaliert. Die Preise für zusätzliche Arbeitslasten, die Sie im Cluster Ihrer Umgebung ausführen, richten sich nach dem Cloud Composer 2-Preismodell und verwenden Cloud Composer Compute SKUs.Cloud Composer 2 verwendet GKE-Cluster mit Workload Identity. Standardmäßig können Pods, die in einem neu erstellten Namespace oder im Namespace
composer-user-workloads
ausgeführt werden, nicht auf Google Cloud-Ressourcen zugreifen. Wenn Sie Workload Identity verwenden, müssen Kubernetes-Dienstkonten, die Namespaces zugeordnet sind, Google Cloud-Dienstkonten zugeordnet werden, um die Autorisierung der Dienstidentität für Anfragen an Google APIs und andere Dienste zu aktivieren.Wenn Sie daher Pods im Namespace
composer-user-workloads
oder in einem neu erstellten Namespace im Cluster Ihrer Umgebung ausführen, werden keine ordnungsgemäßen IAM-Bindungen zwischen Kubernetes und Google Cloud-Dienstkonten erstellt und diese Pods können nicht auf Ressourcen Ihres Google Cloud-Projekts zugreifen.Wenn Sie möchten, dass Ihre Pods Zugriff auf Google Cloud-Ressourcen haben, verwenden Sie den Namespace
composer-user-workloads
oder erstellen Sie Ihren eigenen Namespace wie näher beschrieben.Folgen Sie der Anleitung in Workload Identity und richten Sie die Bindungen ein, um Zugriff auf die Ressourcen Ihres Projekts zu gewähren:
- Erstellen Sie einen separaten Namespace im Cluster Ihrer Umgebung.
- Erstellen Sie eine Bindung zwischen dem Kubernetes-Dienstkonto
composer-user-workloads/<namespace_name>
und dem Dienstkonto Ihrer Umgebung. - Fügen Sie dem Kubernetes-Dienstkonto die Dienstkontoannotation Ihrer Umgebung hinzu.
- Wenn Sie
KubernetesPodOperator
verwenden, geben Sie den Namespace und das Kubernetes-Dienstkonto in den Parameternnamespace
undservice_account_name
an.
Cloud Composer 2 verwendet GKE-Cluster mit Workload Identity. Es dauert einige Sekunden, bis der GKE-Metadatenserver Anfragen für einen neu erstellten Pod akzeptiert. Daher können Versuche, sich mit Workload Identity innerhalb der ersten Sekunden der Lebensdauer eines Pods zu authentifizieren, fehlschlagen. Weitere Informationen zu dieser Einschränkung finden Sie unter Einschränkungen von Workload Identity.
Cloud Composer 2 verwendet Autopilot-Cluster, die das Konzept von Computing-Klassen einführen:
Wenn keine Klasse ausgewählt ist, wird beim Erstellen von Pods mit
KubernetesPodOperator
standardmäßig die Klassegeneral-purpose
angenommen.Jeder Klasse sind bestimmte Attribute und Ressourcenlimits zugeordnet. Informationen dazu finden Sie in der Autopilot-Dokumentation. Beispielsweise können Pods, die in der Klasse
general-purpose
ausgeführt werden, bis zu 110 GiB Arbeitsspeicher verwenden.
Wenn Version 5.0.0 des CNCF-Kubernetes-Anbieters verwendet wird, folgen Sie der dokumentierten Anleitung im Abschnitt zum CNCF-Kubernetes-Anbieter.
KubernetesPodOperator-Konfiguration
Zum Ausführen dieses Beispiels legen Sie die gesamte kubernetes_pod_operator.py
-Datei in den dags/
-Ordner Ihrer Umgebung. Alternativ können Sie den entsprechenden KubernetesPodOperator
-Code einem DAG hinzufügen.
In den folgenden Abschnitten wird jede KubernetesPodOperator
-Konfiguration im Beispiel erläutert. Informationen zu den einzelnen Konfigurationsvariablen finden Sie in der Airflow-Referenz.
Minimalkonfiguration
Zum Erstellen eines KubernetesPodOperator
sind nur die name
des Pods, das namespace
, in dem der Pod ausgeführt werden soll, und das zu verwendende image
sowie das task_id
erforderlich.
Wenn Sie das folgende Code-Snippet in eine DAG einfügen, verwendet die Konfiguration die Standardwerte in /home/airflow/composer_kube_config
. Sie müssen den Code nicht ändern, damit die Aufgabe pod-ex-minimum
erfolgreich ausgeführt wird.
Vorlagenkonfiguration
Airflow unterstützt die Verwendung von Jinja-Vorlagen.
Sie müssen die erforderlichen Variablen (task_id
, name
, namespace
, image
) mit dem Operator deklarieren. Wie im folgenden Beispiel gezeigt, können Sie alle anderen Parameter mit Jinja als Vorlage verwenden, einschließlich cmds
, arguments
, env_vars
und config_file
.
Wenn der DAG oder die Umgebung nicht geändert wird, schlägt die Aufgabe ex-kube-templates
aufgrund von zwei Fehlern fehl. Die Logs zeigen, dass diese Aufgabe fehlschlägt, weil die entsprechende Variable (my_value
) fehlt. Der zweite Fehler, den Sie nach dem Beheben des ersten Fehlers erhalten können, zeigt, dass die Aufgabe fehlschlägt, weil core/kube_config
nicht in config
gefunden wird.
Gehen Sie wie unten beschrieben vor, um beide Fehler zu beheben.
So legen Sie my_value
mit gcloud
oder der Airflow-UI fest:
Airflow-UI
In der Airflow 2-UI:
Rufen Sie die Airflow-UI auf.
Klicken Sie in der Symbolleiste auf Admin > Variables.
Klicken Sie auf der Seite Listenvariable auf Neuen Eintrag hinzufügen.
Geben Sie auf der Seite Add Variable (Variable hinzufügen) die folgenden Informationen ein:
- Key:
my_value
- Val:
example_value
- Key:
Klicken Sie auf Speichern.
gcloud
Geben Sie für Airflow 2 den folgenden Befehl ein:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Ersetzen Sie:
ENVIRONMENT
durch den Namen der Umgebung.LOCATION
durch die Region, in der sich die Umgebung befindet.
Wenn Sie auf eine benutzerdefinierte config_file
(Kubernetes-Konfigurationsdatei) verweisen möchten, überschreiben Sie die Airflow-Konfigurationsoption kube_config
durch eine gültige Kubernetes-Konfiguration:
Bereich | Schlüssel | Wert |
---|---|---|
core |
kube_config |
/home/airflow/composer_kube_config |
Warten Sie einige Minuten, bis die Umgebung aktualisiert wurde. Führen Sie dann die Aufgabe ex-kube-templates
noch einmal aus und prüfen Sie, ob die Aufgabe ex-kube-templates
erfolgreich ausgeführt wurde.
Konfiguration von Secret-Variablen
Ein Kubernetes-Secret ist ein Objekt, das sensible Daten enthält. Mit KubernetesPodOperator
können Sie Secrets an die Kubernetes-Pods übergeben.
Secrets müssen in Kubernetes definiert werden. Ansonsten kann der Pod nicht gestartet werden.
Dieses Beispiel zeigt zwei Möglichkeiten für die Verwendung von Kubernetes Secrets: als Umgebungsvariable und als vom Pod bereitgestelltes Volume.
Das erste Secret, airflow-secrets
, ist auf eine Kubernetes-Umgebungsvariable namens SQL_CONN
festgelegt (nicht auf eine Airflow- oder Cloud Composer-Umgebungsvariable).
Das zweite Secret, service-account
, stellt service-account.json
, eine Datei mit einem Dienstkontotoken, in /var/secrets/google
bereit.
Die Secrets sehen so aus:
Der Name des ersten Kubernetes-Secrets wird in der Variablen secret
definiert.
Der Name dieses speziellen Secrets lautet airflow-secrets
. Es wird als Umgebungsvariable gemäß der Vorgabe von deploy_type
bereitgestellt. Die Umgebungsvariable, für die es festgelegt wird (deploy_target
), lautet SQL_CONN
. Schließlich lautet der key
des Secrets, das in deploy_target
gespeichert ist, sql_alchemy_conn
.
Der Name des zweiten Kubernetes-Secrets wird in der Variablen secret
definiert.
Der Name dieses speziellen Secrets lautet service-account
. Es wird als Volume bereitgestellt, wie von deploy_type
festgelegt. Der Pfad der bereitzustellenden Datei (deploy_target
) lautet /var/secrets/google
. Schließlich lautet der key
des Secrets, das in deploy_target
gespeichert ist, service-account.json
.
So sieht die Operatorkonfiguration aus:
Wenn Sie keine Änderungen am DAG oder der Umgebung vornehmen, schlägt die Aufgabe ex-kube-secrets
fehl. Aus den Logs geht hervor, dass die Aufgabe aufgrund des Fehlers Pod took too long to start
fehlschlägt. Dieser Fehler tritt auf, weil Airflow das in der Konfiguration angegebene Secret secret_env
nicht finden kann.
gcloud
So legen Sie das Secret mit gcloud
fest:
Informationen über Ihren Cloud Composer-Umgebungscluster abrufen
Führen Sie dazu diesen Befehl aus:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
Ersetzen Sie:
ENVIRONMENT
durch den Namen Ihrer Umgebung.LOCATION
durch die Region, in der sich die Cloud Composer-Umgebung befindet.
Die Ausgabe dieses Befehls hat das folgende Format:
projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>
.Zum Abrufen der GKE-Cluster-ID kopieren Sie die Ausgabe nach
/clusters/
(endet auf-gke
).
Stellen Sie mit folgendem Befehl eine Verbindung zum GKE-Cluster her:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --region LOCATION
Ersetzen Sie:
CLUSTER_ID
durch Ihre GKE-Cluster-ID.PROJECT
durch die ID Ihres Google Cloud-Projekts.LOCATION
durch die Region, in der sich die Cloud Composer-Umgebung befindet.
Kubernetes-Secrets erstellen.
Erstellen Sie mit dem folgenden Befehl ein Kubernetes-Secret, das den Wert von
sql_alchemy_conn
auftest_value
festlegt:kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
Erstellen Sie mit folgendem Befehl ein Kubernetes-Secret, das den Wert von
service-account.json
auf einen lokalen Pfad einer Dienstkontoschlüsseldatei namenskey.json
festlegt:kubectl create secret generic service-account \ --from-file service-account.json=./key.json -n composer-user-workloads
Nachdem Sie die Secrets festgelegt haben, führen Sie die Aufgabe
ex-kube-secrets
noch einmal in der Airflow-UI aus.Prüfen Sie, ob die Aufgabe
ex-kube-secrets
erfolgreich ist.
Vollständige Konfiguration
In diesem Beispiel werden alle Variablen gezeigt, die Sie in KubernetesPodOperator
konfigurieren können. Sie müssen den Code nicht ändern, damit die Aufgabe ex-all-configs
erfolgreich ist.
Weitere Informationen zu den einzelnen Variablen finden Sie in der KubernetesPodOperator
-Referenz von Airflow.
Informationen zum CNCF-Kubernetes-Anbieter
GKEStartPodOperator und KubernetesPodOperator sind im Anbieter apache-airflow-providers-cncf-kubernetes
implementiert.
Fehlerhafte Versionshinweise für den CNCF-Kubernetes-Anbieter finden Sie auf der Website des CNCF-Kubernetes-Anbieters.
Version 6.0.0
In Version 6.0.0 des CNCF-Kubernetes-Anbieterpakets wird die kubernetes_default
-Verbindung standardmäßig in der KubernetesPodOperator
verwendet.
Wenn Sie in Version 5.0.0 eine benutzerdefinierte Verbindung angegeben haben, wird diese weiterhin vom Operator verwendet. Wenn Sie wieder die kubernetes_default
-Verbindung verwenden möchten, müssen Sie Ihre DAGs möglicherweise entsprechend anpassen.
Version 5.0.0
Diese Version enthält einige abwärtsinkompatible Änderungen im Vergleich zu Version 4.4.0. Die wichtigsten beziehen sich auf die kubernetes_default
-Verbindung, die in Version 5.0.0 nicht verwendet wird.
- Die
kubernetes_default
-Verbindung muss geändert werden. Der Kube-Konfigurationspfad muss auf/home/airflow/composer_kube_config
festgelegt sein (wie in Abbildung 1 gezeigt). Alternativ mussconfig_file
derKubernetesPodOperator
-Konfiguration hinzugefügt werden (wie im folgenden Codebeispiel gezeigt).
- So ändern Sie den Code einer Aufgabe mit KubernetesPodOperator:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Weitere Informationen zu Version 5.0.0 finden Sie in den Versionshinweisen für CNCF-Kubernetes-Anbieter.
Fehlerbehebung
Tipps zur Behebung von Pod-Fehlern
Prüfen Sie neben den Aufgabenlogs in der Airflow-UI auch die folgenden Logs:
Ausgabe des Airflow-Planers und der Airflow-Worker:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Folgen Sie dem Link zu DAGs für Ihre Umgebung.
Gehen Sie in dem Bucket Ihrer Umgebung eine Ebene höher.
Prüfen Sie die Logs im Ordner
logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>
.
Detaillierte Pod-Logs in der Google Cloud Console unter GKE-Arbeitslasten. Diese Logs enthalten die YAML-Datei der Pod-Definition, Pod-Ereignisse und Pod-Informationen.
Rückgabewerte ungleich null, wenn auch GKEStartPodOperator
verwendet wird
Wenn Sie KubernetesPodOperator
und GKEStartPodOperator
verwenden, bestimmt der Rückgabecode des Einstiegspunkts des Containers, ob die Aufgabe als erfolgreich betrachtet wird oder nicht. Rückgabecodes mit einem Wert ungleich null weisen auf einen Fehler hin.
Ein gängiges Muster bei der Verwendung von KubernetesPodOperator
und GKEStartPodOperator
besteht darin, ein Shell-Skript als Container-Einstiegspunkt auszuführen, um mehrere Vorgänge im Container zusammenzufassen.
Wenn Sie ein solches Skript schreiben, sollten Sie den Befehl set -e
oben im Skript einfügen, sodass fehlgeschlagene Befehle im Skript das Skript beenden und den Fehler an die Airflow-Aufgabeninstanz weiterleiten.
Pod-Zeitüberschreitungen
Das Standardzeitlimit für KubernetesPodOperator
beträgt 120 Sekunden. Dies kann zu Zeitüberschreitungen führen, bevor größere Images heruntergeladen sind. Sie können das Zeitlimit erhöhen, wenn Sie beim Erstellen von KubernetesPodOperator
den Parameter startup_timeout_seconds
ändern.
Wenn eine Pod-Zeitüberschreitung auftritt, ist das aufgabenspezifische Log in der Airflow-UI verfügbar. Beispiel:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Pod-Zeitüberschreitungen können auch auftreten, wenn dem Cloud Composer-Dienstkonto die erforderlichen IAM-Berechtigungen zum Ausführen der aktuellen Aufgabe fehlen. Um dies zu prüfen, prüfen Sie mithilfe der GKE-Dashboards die Fehler auf Pod-Ebene, um sich die Logs für Ihre spezifische Arbeitslast anzusehen, oder verwenden Sie Cloud Logging.
Es konnte keine neue Verbindung hergestellt werden
Automatische Upgrades sind in GKE-Clustern standardmäßig aktiviert. Wenn sich ein Knotenpool in einem Cluster befindet, für den gerade ein Upgrade durchgeführt wird, sehen Sie möglicherweise die folgende Fehlermeldung:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Wenn Sie prüfen möchten, ob der Cluster aktualisiert wird, rufen Sie in der Google Cloud Console die Seite Kubernetes-Cluster auf und suchen Sie nach dem Ladesymbol neben dem Clusternamen Ihrer Umgebung.