Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Auf dieser Seite wird beschrieben, wie Sie mit dem KubernetesPodOperator Kubernetes-Pods von Cloud Composer in die Google Kubernetes Engine Cluster, der zu Ihrer Cloud Composer-Umgebung gehört.
KubernetesPodOperator startet Kubernetes-Pods im Cluster Ihrer Umgebung. Im Vergleich dazu Mit Google Kubernetes Engine-Operatoren werden Kubernetes-Pods in einer bestimmten Cluster, der ein separater Cluster sein kann, der nicht mit Ihrem zu verbessern. Sie können Cluster auch mit Google Kubernetes Engine-Operatoren erstellen und löschen.
KubernetesPodOperator ist eine gute Option, wenn Sie Folgendes benötigen:
- Benutzerdefinierte Python-Abhängigkeiten, die nicht über das öffentliche PyPI-Repository verfügbar sind.
- Binäre Abhängigkeiten, die im Cloud Composer-Worker-Image nicht verfügbar sind.
Hinweis
Wenn Version 5.0.0 des CNCF-Kubernetes-Anbieters verwendet wird, folgen Sie der Anleitung Abschnitt zu CNCF-Kubernetes-Anbietern.
Die Pod-Affinitätskonfiguration ist in Cloud Composer 2 nicht verfügbar. Wenn Sie die Pod-Affinität verwenden möchten, starten Sie Pods stattdessen mit den GKE-Operatoren in einem anderen Cluster.
KubernetesPodOperator in Cloud Composer 2
In diesem Abschnitt wird beschrieben, wie KubernetesPodOperator in Cloud Composer 2 funktioniert.
Ressourcennutzung
In Cloud Composer 2 wird der Cluster Ihrer Umgebung automatisch skaliert. Zusätzliche Arbeitslasten, die Sie ausführen mit KubernetesPodOperator unabhängig von Ihrer Umgebung skalieren.
Ihre Umgebung ist nicht von der erhöhten Ressourcennachfrage betroffen, aber der Cluster Ihrer Umgebung wird je nach Ressourcenbedarf skaliert.
Die Preise für die zusätzlichen Arbeitslasten, die Sie in Ihrem und der Cluster der Umgebung Cloud Composer 2-Preismodell und verwendet Compute-SKUs für Cloud Composer.
Cloud Composer 2 verwendet Autopilot-Cluster, die das Konzept einführen von Compute-Klassen:
Cloud Composer unterstützt nur die Compute-Klasse
general-purpose
.Wenn keine Klasse ausgewählt ist, wird standardmäßig die Klasse
general-purpose
verwendet, wenn Sie Pods mit KubernetesPodOperator erstellen.Jede Klasse ist mit bestimmten Eigenschaften und Ressourcenlimits verknüpft. Weitere Informationen finden Sie in der Autopilot-Dokumentation. Pods, die in der
general-purpose
-Klasse ausgeführt werden, können beispielsweise bis zu 110 GiB Arbeitsspeicher verbrauchen.
Zugriff auf die Ressourcen des Projekts
Cloud Composer 2 verwendet GKE-Cluster mit Workload Identity Federation for GKE. Pods, die im Namespace composer-user-workloads
ausgeführt werden, können ohne zusätzliche Konfiguration auf Google Cloud-Ressourcen in Ihrem Projekt zugreifen. Der Zugriff auf diese Ressourcen erfolgt über das Dienstkonto Ihrer Umgebung.
Wenn Sie einen benutzerdefinierten Namespace verwenden möchten, müssen Kubernetes-Dienstkonten die mit diesem Namespace verknüpft sind, muss dem Google Cloud-Dienst zugeordnet werden -Konten, um die Dienstidentitätsautorisierung für Anfragen an Google APIs zu aktivieren und anderen Diensten. Wenn Sie Pods in einem benutzerdefinierten Namespace im und dann IAM-Bindungen zwischen Kubernetes und Google Cloud-Dienstkonten werden nicht erstellt und diese Pods können auch nicht Zugriff auf Ressourcen Ihres Google Cloud-Projekts.
Wenn Sie einen benutzerdefinierten Namespace verwenden und möchten, dass Ihre Pods Zugriff auf Google Cloud-Ressourcen, dann Folgen Sie der Anleitung in der Identitätsföderation von Arbeitslasten für GKE und richten Sie die Bindungen für einen benutzerdefinierten Namespace ein:
- Erstellen Sie einen separaten Namespace im Cluster Ihrer Umgebung.
- Erstellen Sie eine Bindung zwischen dem Kubernetes-Dienstkonto des benutzerdefinierten Namespace und dem Dienstkonto Ihrer Umgebung.
- Fügen Sie dem Kubernetes-Dienstkonto die Dienstkontoannotation Ihrer Umgebung hinzu.
- Wenn Sie KubernetesPodOperator verwenden, geben Sie den Namespace und das Kubernetes-Dienstkonto in den Parametern
namespace
undservice_account_name
an.
Minimalkonfiguration
Zum Erstellen eines KubernetesPodOperator sind nur die Pod-Parameter name
, image
und task_id
erforderlich. Die /home/airflow/composer_kube_config
enthält Anmeldedaten für die Authentifizierung bei GKE.
Zusätzliche Konfiguration
In diesem Beispiel werden zusätzliche Parameter gezeigt, die Sie im KubernetesPodOperator konfigurieren können.
Weitere Informationen zu Parametern finden Sie in der Airflow-Referenz für KubernetesPodOperator. Informationen zur Verwendung von Kubernetes-Secrets und ConfigMaps finden Sie unter Kubernetes-Secrets und ConfigMaps verwenden. Für Informationen zur Verwendung von Jinja-Vorlagen mit KubernetesPodOperator finden Sie unter Jinja-Vorlagen verwenden
Jinja-Vorlagen verwenden
Airflow unterstützt Jinja-Vorlagen in DAGs.
Sie müssen die erforderlichen Airflow-Parameter (task_id
, name
und
image
) durch den Operator. Wie im folgenden Beispiel gezeigt,
können Sie alle anderen Parameter mit Jinja als Vorlage verwenden, z. B. cmds
,
arguments
, env_vars
und config_file
.
Der Parameter env_vars
in diesem Beispiel wird von einem
Airflow-Variable mit dem Namen my_value
. Der Beispiel-DAG bezieht seinen Wert aus der Vorlagenvariablen vars
in Airflow. Airflow bietet mehr
Variablen, die Zugriff auf
verschiedene Arten von Informationen ermöglichen. Beispiel:
können Sie die Vorlagenvariable conf
verwenden, um auf Werte von
Airflow-Konfigurationsoptionen. Weitere Informationen und die
Liste der in Airflow verfügbaren Variablen finden Sie unter
Vorlagenreferenz in Airflow
Dokumentation.
Ohne den DAG zu ändern oder die Variable env_vars
zu erstellen, schlägt die Aufgabe ex-kube-templates
im Beispiel fehl, da die Variable nicht vorhanden ist. Erstellen Sie diese Variable in der Airflow-UI oder mit der Google Cloud CLI:
Airflow-UI
Rufen Sie die Airflow-UI auf.
Klicken Sie in der Symbolleiste auf Verwaltung > Variablen.
Klicken Sie auf der Seite Listenvariable auf Neuen Eintrag hinzufügen.
Geben Sie auf der Seite Add Variable (Variable hinzufügen) die folgenden Informationen ein:
- Key:
my_value
- Val:
example_value
- Key:
Klicken Sie auf Speichern.
gcloud
Geben Sie den folgenden Befehl ein:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Ersetzen Sie:
ENVIRONMENT
durch den Namen der Umgebung.LOCATION
durch die Region, in der sich die Umgebung befindet.
Das folgende Beispiel zeigt, wie Jinja-Vorlagen mit KubernetesPodOperator:
Kubernetes-Secrets und ConfigMaps verwenden
Ein Kubernetes-Secret ist ein Objekt, das sensible Daten enthält. Eine Kubernetes-ConfigMap ist ein Objekt, das nicht vertrauliche Daten in Schlüssel/Wert-Paaren enthält.
In Cloud Composer 2 können Sie Secrets und ConfigMaps mit folgendem Befehl erstellen: Google Cloud CLI, API oder Terraform verwenden und von dort aus auf KubernetesPodOperator.
YAML-Konfigurationsdateien
Wenn Sie ein Kubernetes-Secret oder eine ConfigMap mithilfe der Google Cloud CLI und API können Sie eine Datei im YAML-Format bereitstellen. Diese Datei muss den gleichen das von Kubernetes Secrets und ConfigMaps verwendet wird. Die Kubernetes-Dokumentation enthält viele Codebeispiele für ConfigMaps und Secrets. Sehen Sie sich als Erstes die Seite Anmeldedaten mit Secrets sicher verteilen und ConfigMaps an.
Wie in Kubernetes Secrets, Base64 verwenden wenn Sie Werte in Secrets definieren.
Zum Codieren eines Werts können Sie den folgenden Befehl verwenden. Dies ist eine von vielen Möglichkeiten, um einen base64-codierten Wert zu erhalten):
echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64
Ausgabe:
cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
Die folgenden beiden YAML-Dateibeispiele werden später in diesem Leitfaden in Beispielen verwendet. Beispiel für eine YAML-Konfigurationsdatei für ein Kubernetes-Secret:
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
data:
sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
Ein weiteres Beispiel, das zeigt, wie Dateien eingefügt werden. Wie im vorherigen Beispiel: Codieren Sie zuerst den Inhalt einer Datei (cat ./key.json | base64
) und geben Sie diesen Wert dann in der YAML-Datei an:
apiVersion: v1
kind: Secret
metadata:
name: service-account
data:
service-account.json: |
ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K
Beispiel für eine YAML-Konfigurationsdatei für eine ConfigMap. Sie müssen die Base64-Darstellung in ConfigMaps nicht verwenden:
apiVersion: v1
kind: ConfigMap
metadata:
name: example-configmap
data:
example_key: example_value
Kubernetes-Secrets verwalten
In Cloud Composer 2 erstellen Sie Secrets mit der Google Cloud CLI und kubectl
:
Rufen Sie Informationen zum Cluster Ihrer Umgebung ab:
Führen Sie dazu diesen Befehl aus:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
Ersetzen Sie:
ENVIRONMENT
durch den Namen Ihrer Umgebung.LOCATION
durch die Region, in der sich die Cloud Composer-Umgebung befindet.
Die Ausgabe dieses Befehls hat das folgende Format:
projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>
.Zum Abrufen der GKE-Cluster-ID kopieren Sie die Ausgabe nach
/clusters/
(endet auf-gke
).
Stellen Sie mit folgendem Befehl eine Verbindung zu Ihrem GKE-Cluster her: Befehl:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --region LOCATION
Ersetzen Sie Folgendes:
CLUSTER_ID
: die Cluster-ID der Umgebung.PROJECT_ID
: die Projekt-ID.LOCATION
: die Region, in der sich die Umgebung befindet.
Erstellen Sie Kubernetes-Secrets:
Die folgenden Befehle zeigen zwei verschiedene Ansätze zum Erstellen Kubernetes-Secrets Der Ansatz
--from-literal
verwendet Schlüssel/Wert-Paare. Beim--from-file
-Ansatz werden Dateiinhalte verwendet.Führen Sie den folgenden Befehl aus, um ein Kubernetes-Secret mithilfe von Schlüssel/Wert-Paaren zu erstellen. In diesem Beispiel wird ein Secret namens
airflow-secrets
mit dem Feldsql_alchemy_conn
mit dem Wert vontest_value
kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
Führen Sie den Befehl folgenden Befehl. In diesem Beispiel wird ein Secret mit dem Namen
service-account
erstellt, das das Feldservice-account.json
mit dem Wert aus dem Inhalt einer lokalen./key.json
-Datei enthält.kubectl create secret generic service-account \ --from-file service-account.json=./key.json -n composer-user-workloads
Kubernetes-Secrets in DAGs verwenden
In diesem Beispiel werden zwei Möglichkeiten zur Verwendung von Kubernetes Secrets gezeigt: als Umgebung und als vom Pod bereitgestelltes Volume.
Das erste Secret, airflow-secrets
, ist auf eine Kubernetes-Umgebungsvariable namens SQL_CONN
festgelegt (nicht auf eine Airflow- oder Cloud Composer-Umgebungsvariable).
Das zweite Secret, service-account
, stellt service-account.json
, eine Datei, bereit
durch ein Dienstkonto-Token auf /var/secrets/google
.
Die Secret-Objekte sehen so aus:
Der Name des ersten Kubernetes-Secrets wird in der Variablen secret_env
definiert.
Dieses Secret heißt airflow-secrets
. Der Parameter deploy_type
gibt an,
dass sie als Umgebungsvariable bereitgestellt werden muss. Der Wert der Umgebungsvariablen
Der Name lautet SQL_CONN
, wie im Parameter deploy_target
angegeben. Die Funktion
Der Wert der Umgebungsvariable SQL_CONN
wird auf den Wert der
sql_alchemy_conn
-Taste.
Der Name des zweiten Kubernetes-Secrets wird in der Variablen secret_volume
definiert. Dieses Secret heißt service-account
. Sie wird als
Lautstärke, wie im Parameter deploy_type
angegeben. Der Pfad der Datei zu
Bereitstellung (deploy_target
) ist /var/secrets/google
. Schließlich lautet der key
des Secrets, das in deploy_target
gespeichert ist, service-account.json
.
Die Operatorkonfiguration sieht so aus:
Informationen zum CNCF-Kubernetes-Anbieter
KubernetesPodOperator ist im apache-airflow-providers-cncf-kubernetes
-Anbieter implementiert.
Detaillierte Versionshinweise für den CNCF-Kubernetes-Anbieter finden Sie unter Website des CNCF-Kubernetes-Anbieters
Version 6.0.0
In Version 6.0.0 des CNCF Kubernetes-Anbieterpakets wird die kubernetes_default
-Verbindung standardmäßig in KubernetesPodOperator verwendet.
Wenn Sie in Version 5.0.0 eine benutzerdefinierte Verbindung angegeben haben, wird diese vom Betreiber weiterhin verwendet. Wenn Sie wieder zur kubernetes_default
-Verbindung zurückkehren möchten, sollten Sie Ihre DAGs entsprechend anpassen.
Version 5.0.0
Diese Version führt im Vergleich zu Version 4.4.0 einige nicht abwärtskompatible Änderungen ein. Die wichtigsten betreffen die kubernetes_default
-Verbindung, die in Version 5.0.0 nicht verwendet wird.
- Die
kubernetes_default
-Verbindung muss geändert werden. Der Kubernetes-Konfigurationspfad muss auf/home/airflow/composer_kube_config
festgelegt sein (wie in der folgenden Abbildung dargestellt). Alternativ mussconfig_file
der KubernetesPodOperator-Konfiguration hinzugefügt werden (wie in der im folgenden Codebeispiel).
- Ändern Sie den Code einer Aufgabe mit KubernetesPodOperator so:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Weitere Informationen zu Version 5.0.0 finden Sie unter Versionshinweise für CNCF-Kubernetes-Anbieter
Fehlerbehebung
Dieser Abschnitt enthält Tipps zur Fehlerbehebung bei gängigen KubernetesPodOperator Probleme:
Logs ansehen
Bei der Fehlerbehebung können Sie Protokolle in der folgenden Reihenfolge überprüfen:
Airflow-Aufgabenlogs:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab DAGs auf.
Klicken Sie auf den Namen des DAG und dann auf die DAG-Ausführung, um die Details und Protokolle aufzurufen.
Airflow-Planer-Logs:
Rufen Sie die Seite Umgebungsdetails auf.
Rufen Sie den Tab Logs auf.
Prüfen Sie die Logs des Airflow-Planers.
Pod-Logs in der Google Cloud Console unter GKE Arbeitsbelastungen. Diese Logs enthalten die YAML-Datei für die Pod-Definition, Pod-Ereignisse und Pod-Details.
Rückgabecodes ungleich null
Wenn Sie den KubernetesPodOperator (und GKEStartPodOperator) verwenden, gibt den Einstiegspunkt des Containers an, ob die Aufgabe ob sie erfolgreich sind oder nicht. Rückgabecodes mit einem Wert ungleich null weisen auf einen Fehler hin.
Ein gängiges Muster besteht darin, ein Shell-Script als Container-Einstiegspunkt auszuführen, um mehrere Vorgänge innerhalb des Containers zusammenzufassen.
Wenn Sie ein solches Skript schreiben, sollten Sie den Befehl set -e
oben im Skript einfügen, sodass fehlgeschlagene Befehle im Skript das Skript beenden und den Fehler an die Airflow-Aufgabeninstanz weiterleiten.
Pod-Zeitüberschreitungen
Das Standardzeitlimit für KubernetesPodOperator beträgt 120 Sekunden,
kann zu Zeitüberschreitungen führen, bevor größere Bilder heruntergeladen werden. Sie können
das Zeitlimit erhöhen, indem Sie den Parameter startup_timeout_seconds
ändern, wenn
erstellen Sie den KubernetesPodOperator.
Wenn eine Pod-Zeitüberschreitung auftritt, ist das aufgabenspezifische Log in der Airflow-UI verfügbar. Beispiel:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Pod-Zeitüberschreitungen können auch auftreten, wenn Cloud Composer-Dienstkonto nicht über die erforderlichen IAM-Berechtigungen verfügt, um die Aufgabe am Hand. Überprüfen Sie dies, indem Sie sich Fehler auf Pod-Ebene mithilfe der Methode GKE-Dashboards zum Ansehen der Logs für Ihre oder Cloud Logging verwenden.
Neue Verbindung konnte nicht hergestellt werden
Automatische Upgrades sind in GKE-Clustern standardmäßig aktiviert. Wenn sich ein Knotenpool in einem Cluster befindet, für den gerade ein Upgrade durchgeführt wird, sehen Sie möglicherweise die folgende Fehlermeldung:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Wenn Sie prüfen möchten, ob der Cluster aktualisiert wird, rufen Sie in der Google Cloud Console die Seite Kubernetes-Cluster auf und suchen nach dem Ladesymbol neben dem Clusternamen Ihrer Umgebung.