Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Auf dieser Seite wird beschrieben, wie Sie mit dem KubernetesPodOperator Kubernetes-Pods von Cloud Composer im Google Kubernetes Engine-Cluster bereitstellen, der Teil Ihrer Cloud Composer-Umgebung ist.
KubernetesPodOperator startet Kubernetes-Pods im Cluster Ihrer Umgebung. Mit Google Kubernetes Engine-Operatoren werden dagegen Kubernetes-Pods in einem bestimmten Cluster ausgeführt. Dabei kann es sich um einen separaten Cluster handeln, der nicht mit Ihrer Umgebung in Verbindung steht. Sie können Cluster auch mit Google Kubernetes Engine-Operatoren erstellen und löschen.
KubernetesPodOperator ist eine gute Option, wenn Sie Folgendes benötigen:
- Benutzerdefinierte Python-Abhängigkeiten, die nicht über das öffentliche PyPI-Repository verfügbar sind.
- Binäre Abhängigkeiten, die im Cloud Composer-Worker-Image nicht verfügbar sind.
Hinweise
Wenn Version 5.0.0 des CNCF Kubernetes-Anbieters verwendet wird, folgen Sie der Anleitung im Abschnitt zum CNCF Kubernetes-Anbieter.
Die Pod-Affinitätskonfiguration ist in Cloud Composer 2 nicht verfügbar. Wenn Sie die Pod-Affinität verwenden möchten, verwenden Sie stattdessen die GKE-Operatoren, um Pods in einem anderen Cluster zu starten.
KubernetesPodOperator in Cloud Composer 2
In diesem Abschnitt wird beschrieben, wie der KubernetesPodOperator in Cloud Composer 2 funktioniert.
Ressourcennutzung
In Cloud Composer 2 wird der Cluster Ihrer Umgebung automatisch skaliert. Zusätzliche Arbeitslasten, die Sie mit KubernetesPodOperator ausführen, werden unabhängig von Ihrer Umgebung skaliert.
Ihre Umgebung ist nicht von der erhöhten Ressourcennachfrage betroffen, aber der Cluster Ihrer Umgebung wird je nach Ressourcenbedarf skaliert.
Die Preise für zusätzliche Arbeitslasten, die Sie im Cluster Ihrer Umgebung ausführen, richten sich nach dem Cloud Composer 2-Preismodell und verwenden Cloud Composer Compute SKUs.
Cloud Composer 2 verwendet Autopilot-Cluster, in denen das Konzept von Compute-Klassen eingeführt wird:
Cloud Composer unterstützt nur die Compute-Klasse
general-purpose
.Wenn keine Klasse ausgewählt ist, wird standardmäßig die Klasse
general-purpose
verwendet, wenn Sie Pods mit KubernetesPodOperator erstellen.Jede Klasse ist mit bestimmten Eigenschaften und Ressourcenlimits verknüpft. Weitere Informationen finden Sie in der Autopilot-Dokumentation. Pods, die in der Klasse
general-purpose
ausgeführt werden, können beispielsweise bis zu 110 GiB Arbeitsspeicher verwenden.
Zugriff auf die Ressourcen des Projekts
Cloud Composer 2 verwendet GKE-Cluster mit der Identitätsföderation von Arbeitslasten für GKE. Pods, die im Namespace composer-user-workloads
ausgeführt werden, können ohne zusätzliche Konfiguration auf Google Cloud -Ressourcen in Ihrem Projekt zugreifen. Das Dienstkonto Ihrer Umgebung wird für den Zugriff auf diese Ressourcen verwendet.
Wenn Sie einen benutzerdefinierten Namespace verwenden möchten, müssen Kubernetes-Dienstkonten, die diesem Namespace zugeordnet sind, Google Cloud Dienstkonten zugeordnet werden, um die Autorisierung der Dienstidentität für Anfragen an Google APIs und andere Dienste zu ermöglichen. Wenn Sie Pods in einem benutzerdefinierten Namespace im Cluster Ihrer Umgebung ausführen, werden keine IAM-Bindungen zwischen Kubernetes- undGoogle Cloud -Dienstkonten erstellt und diese Pods können nicht auf Ressourcen Ihres Google Cloud -Projekts zugreifen.
Wenn Sie einen benutzerdefinierten Namespace verwenden und Ihre Pods Zugriff aufGoogle Cloud -Ressourcen haben sollen, folgen Sie der Anleitung unter „Identitätsföderation von Arbeitslasten für GKE“ und richten Sie die Bindungen für einen benutzerdefinierten Namespace ein:
- Erstellen Sie einen separaten Namespace im Cluster Ihrer Umgebung.
- Erstellen Sie eine Bindung zwischen dem Kubernetes-Dienstkonto des benutzerdefinierten Namespace und dem Dienstkonto Ihrer Umgebung.
- Fügen Sie dem Kubernetes-Dienstkonto die Dienstkontoannotation Ihrer Umgebung hinzu.
- Wenn Sie KubernetesPodOperator verwenden, geben Sie den Namespace und das Kubernetes-Dienstkonto in den Parametern
namespace
undservice_account_name
an.
Minimalkonfiguration
Zum Erstellen eines KubernetesPodOperator sind nur die Parameter name
, image
und task_id
des Pods erforderlich. Die Datei /home/airflow/composer_kube_config
enthält Anmeldedaten für die Authentifizierung bei GKE.
Zusätzliche Konfiguration
In diesem Beispiel werden zusätzliche Parameter gezeigt, die Sie im KubernetesPodOperator konfigurieren können.
Weitere Informationen finden Sie in den folgenden Ressourcen:
Informationen zur Verwendung von Kubernetes-Secrets und -ConfigMaps finden Sie unter Kubernetes-Secrets und -ConfigMaps verwenden.
Informationen zur Verwendung von Jinja-Vorlagen mit KubernetesPodOperator finden Sie unter Jinja-Vorlagen verwenden.
Informationen zu den KubernetesPodOperator-Parametern finden Sie in der Airflow-Dokumentation in der Operator-Referenz.
Jinja-Vorlagen verwenden
Airflow unterstützt Jinja-Vorlagen in DAGs.
Sie müssen die erforderlichen Airflow-Parameter (task_id
, name
und image
) mit dem Operator deklarieren. Wie im folgenden Beispiel gezeigt, können Sie alle anderen Parameter mit Jinja als Vorlage verwenden, einschließlich cmds
, arguments
, env_vars
und config_file
.
Der Parameter env_vars
im Beispiel wird aus einer Airflow-Variablen mit dem Namen my_value
festgelegt. Der Beispiel-DAG ruft seinen Wert aus der Vorlagenvariablen vars
in Airflow ab. Airflow hat mehr Variablen, die Zugriff auf verschiedene Arten von Informationen bieten. Sie können beispielsweise mit der Vorlagenvariablen conf
auf Werte von Airflow-Konfigurationsoptionen zugreifen. Weitere Informationen und eine Liste der in Airflow verfügbaren Variablen finden Sie in der Airflow-Dokumentation unter Templates reference.
Wenn der DAG nicht geändert und die Variable env_vars
nicht erstellt wird, schlägt die Aufgabe ex-kube-templates
im Beispiel fehl, da die Variable nicht vorhanden ist. Erstellen Sie diese Variable in der Airflow-Benutzeroberfläche oder mit der Google Cloud CLI:
Airflow-UI
Rufen Sie die Airflow-UI auf.
Wählen Sie in der Symbolleiste Admin > Variablen aus.
Klicken Sie auf der Seite Listenvariable auf Neuen Eintrag hinzufügen.
Geben Sie auf der Seite Add Variable (Variable hinzufügen) die folgenden Informationen ein:
- Key:
my_value
- Val:
example_value
- Key:
Klicken Sie auf Speichern.
gcloud
Geben Sie den folgenden Befehl ein:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Ersetzen Sie:
ENVIRONMENT
durch den Namen der Umgebung.LOCATION
durch die Region, in der sich die Umgebung befindet.
Das folgende Beispiel zeigt, wie Sie Jinja-Vorlagen mit KubernetesPodOperator verwenden:
Kubernetes-Secrets und -ConfigMaps verwenden
Ein Kubernetes-Secret ist ein Objekt, das sensible Daten enthält. Eine Kubernetes-ConfigMap ist ein Objekt, das nicht vertrauliche Daten in Schlüssel/Wert-Paaren enthält.
In Cloud Composer 2 können Sie Secrets und ConfigMaps mit der Google Cloud CLI, der API oder Terraform erstellen und dann über KubernetesPodOperator darauf zugreifen.
YAML-Konfigurationsdateien
Wenn Sie ein Kubernetes-Secret oder eine ConfigMap mit der Google Cloud CLI und der API erstellen, geben Sie eine Datei im YAML-Format an. Diese Datei muss dasselbe Format wie Kubernetes-Secrets und ConfigMaps haben. In der Kubernetes-Dokumentation finden Sie viele Codebeispiele für ConfigMaps und Secrets. Weitere Informationen finden Sie auf der Seite Anmeldedaten mit Secrets sicher verteilen und unter ConfigMaps.
Wie bei Kubernetes-Secrets müssen Sie die Base64-Darstellung verwenden, wenn Sie Werte in Secrets definieren.
Sie können einen Wert mit dem folgenden Befehl codieren (dies ist nur eine von vielen Möglichkeiten, einen base64-codierten Wert zu erhalten):
echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64
Ausgabe:
cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
Die folgenden beiden YAML-Dateibeispiele werden später in diesem Leitfaden verwendet. Beispiel für eine YAML-Konfigurationsdatei für ein Kubernetes-Secret:
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
data:
sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
Ein weiteres Beispiel, das zeigt, wie Dateien eingebunden werden. Wie im vorherigen Beispiel codieren Sie zuerst den Inhalt einer Datei (cat ./key.json | base64
) und geben diesen Wert dann in der YAML-Datei an:
apiVersion: v1
kind: Secret
metadata:
name: service-account
data:
service-account.json: |
ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K
Beispiel für eine YAML-Konfigurationsdatei für eine ConfigMap. Sie müssen die base64-Darstellung nicht in ConfigMaps verwenden:
apiVersion: v1
kind: ConfigMap
metadata:
name: example-configmap
data:
example_key: example_value
Kubernetes-Secrets verwalten
In Cloud Composer 2 erstellen Sie Secrets mit der Google Cloud CLI und kubectl
:
Informationen zum Cluster Ihrer Umgebung abrufen:
Führen Sie dazu diesen Befehl aus:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
Ersetzen Sie:
ENVIRONMENT
durch den Namen Ihrer Umgebung.LOCATION
durch die Region, in der sich die Cloud Composer-Umgebung befindet.
Die Ausgabe dieses Befehls hat das folgende Format:
projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>
.Zum Abrufen der GKE-Cluster-ID kopieren Sie die Ausgabe nach
/clusters/
(endet auf-gke
).
Stellen Sie mit folgendem Befehl eine Verbindung zu Ihrem GKE-Cluster her:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --region LOCATION
Ersetzen Sie Folgendes:
CLUSTER_ID
: Die Cluster-ID der Umgebung.PROJECT_ID
: die Projekt-ID.LOCATION
: die Region, in der sich die Umgebung befindet.
Kubernetes-Secrets erstellen:
Die folgenden Befehle zeigen zwei verschiedene Ansätze zum Erstellen von Kubernetes-Secrets. Der Ansatz
--from-literal
verwendet Schlüssel/Wert-Paare. Der Ansatz--from-file
verwendet Dateiinhalte.Führen Sie den folgenden Befehl aus, um ein Kubernetes-Secret zu erstellen, indem Sie Schlüssel/Wert-Paare angeben. In diesem Beispiel wird ein Secret namens
airflow-secrets
erstellt, das ein Feldsql_alchemy_conn
mit dem Werttest_value
enthält.kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
Führen Sie den folgenden Befehl aus, um ein Kubernetes-Secret zu erstellen, indem Sie den Dateiinhalt angeben. In diesem Beispiel wird ein Secret mit dem Namen
service-account
erstellt, das das Feldservice-account.json
mit dem Wert aus dem Inhalt einer lokalen Datei./key.json
enthält.kubectl create secret generic service-account \ --from-file service-account.json=./key.json -n composer-user-workloads
Kubernetes-Secrets in Ihren DAGs verwenden
Dieses Beispiel zeigt zwei Möglichkeiten für die Verwendung von Kubernetes Secrets: als Umgebungsvariable und als vom Pod bereitgestelltes Volume.
Das erste Secret, airflow-secrets
, ist auf eine Kubernetes-Umgebungsvariable namens SQL_CONN
festgelegt (nicht auf eine Airflow- oder Cloud Composer-Umgebungsvariable).
Das zweite Secret, service-account
, stellt service-account.json
, eine Datei mit einem Dienstkontotoken, in /var/secrets/google
bereit.
So sehen die Secret-Objekte aus:
Der Name des ersten Kubernetes-Secrets wird in der Variablen secret_env
definiert.
Dieses Secret heißt airflow-secrets
. Der Parameter deploy_type
gibt an, dass er als Umgebungsvariable verfügbar gemacht werden muss. Der Name der Umgebungsvariable ist SQL_CONN
, wie im Parameter deploy_target
angegeben. Schließlich wird der Wert der Umgebungsvariablen SQL_CONN
auf den Wert des Schlüssels sql_alchemy_conn
festgelegt.
Der Name des zweiten Kubernetes-Secrets wird in der Variablen secret_volume
definiert. Dieses Secret heißt service-account
. Es wird als Volume bereitgestellt, wie im Parameter deploy_type
angegeben. Der Pfad der bereitzustellenden Datei (deploy_target
) lautet /var/secrets/google
. Schließlich lautet der key
des Secrets, das in deploy_target
gespeichert ist, service-account.json
.
Die Operatorkonfiguration sieht so aus:
Informationen zum CNCF Kubernetes-Anbieter
KubernetesPodOperator ist im apache-airflow-providers-cncf-kubernetes
-Anbieter implementiert.
Ausführliche Versionshinweise zum CNCF Kubernetes-Anbieter finden Sie auf der CNCF Kubernetes Provider-Website.
Version 6.0.0
In Version 6.0.0 des CNCF Kubernetes Provider-Pakets wird die kubernetes_default
-Verbindung standardmäßig in KubernetesPodOperator verwendet.
Wenn Sie in Version 5.0.0 eine benutzerdefinierte Verbindung angegeben haben, wird diese weiterhin vom Operator verwendet. Wenn Sie wieder die kubernetes_default
-Verbindung verwenden möchten, müssen Sie Ihre DAGs möglicherweise entsprechend anpassen.
Version 5.0.0
In dieser Version werden einige abwärtsinkompatible Änderungen im Vergleich zu Version 4.4.0 eingeführt. Die wichtigsten beziehen sich auf die kubernetes_default
-Verbindung, die in Version 5.0.0 nicht verwendet wird.
- Die
kubernetes_default
-Verbindung muss geändert werden. Der Kubernetes-Konfigurationspfad muss auf/home/airflow/composer_kube_config
festgelegt sein (siehe Abbildung unten). Alternativ mussconfig_file
der KubernetesPodOperator-Konfiguration hinzugefügt werden (wie im folgenden Codebeispiel gezeigt).

- Ändern Sie den Code einer Aufgabe mit KubernetesPodOperator so:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Weitere Informationen zu Version 5.0.0 finden Sie in den Versionshinweisen zum CNCF Kubernetes-Provider.
Fehlerbehebung
In diesem Abschnitt finden Sie Tipps zur Behebung häufiger Probleme mit KubernetesPodOperator:
Logs ansehen
Bei der Fehlerbehebung können Sie die Logs in der folgenden Reihenfolge prüfen:
Airflow-Tasklogs:
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab DAGs auf.
Klicken Sie auf den Namen des DAG und dann auf den DAG-Lauf, um die Details und Logs aufzurufen.
Airflow-Planer-Logs:
Rufen Sie die Seite Umgebungsdetails auf.
Rufen Sie den Tab Logs auf.
Airflow-Planer-Logs prüfen
Pod-Logs in der Google Cloud Console unter GKE-Arbeitslasten. Diese Logs enthalten die YAML-Datei der Pod-Definition, Pod-Ereignisse und Pod-Informationen.
Rückgabecodes ungleich null
Wenn Sie KubernetesPodOperator (und GKEStartPodOperator) verwenden, bestimmt der Rückgabecode des Einstiegspunkts des Containers, ob die Aufgabe als erfolgreich betrachtet wird oder nicht. Rückgabecodes mit einem Wert ungleich null weisen auf einen Fehler hin.
Ein gängiges Muster besteht darin, ein Shell-Skript als Container-Einstiegspunkt auszuführen, um mehrere Vorgänge innerhalb des Containers zusammenzufassen.
Wenn Sie ein solches Skript schreiben, sollten Sie den Befehl set -e
oben im Skript einfügen, sodass fehlgeschlagene Befehle im Skript das Skript beenden und den Fehler an die Airflow-Aufgabeninstanz weiterleiten.
Pod-Zeitüberschreitungen
Das Standardzeitlimit für KubernetesPodOperator beträgt 120 Sekunden. Dies kann zu Zeitüberschreitungen führen, bevor größere Images heruntergeladen sind. Sie können das Zeitlimit erhöhen, wenn Sie beim Erstellen von KubernetesPodOperator den Parameter startup_timeout_seconds
ändern.
Wenn eine Pod-Zeitüberschreitung auftritt, ist das aufgabenspezifische Log in der Airflow-UI verfügbar. Beispiel:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Pod-Zeitüberschreitungen können auch auftreten, wenn das Cloud Composer-Dienstkonto nicht die erforderlichen IAM-Berechtigungen zum Ausführen der jeweiligen Aufgabe hat. Das können Sie prüfen, wenn Sie sich in den GKE-Dashboards die Fehler auf Pod-Ebene in den Logs für die jeweilige Arbeitslast ansehen oder Cloud Logging verwenden.
Neue Verbindung konnte nicht hergestellt werden
Automatische Upgrades sind in GKE-Clustern standardmäßig aktiviert. Wenn sich ein Knotenpool in einem Cluster befindet, für den gerade ein Upgrade durchgeführt wird, sehen Sie möglicherweise die folgende Fehlermeldung:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Wenn Sie prüfen möchten, ob der Cluster aktualisiert wird, rufen Sie in der Google Cloud Console die Seite Kubernetes-Cluster auf und suchen nach dem Ladesymbol neben dem Clusternamen Ihrer Umgebung.