Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Auf dieser Seite wird beschrieben, wie Sie mit KubernetesPodOperator Kubernetes-Pods von Cloud Composer in den Google Kubernetes Engine-Cluster Ihrer Cloud Composer-Umgebung bereitstellen.
Einführung von KubernetesPodOperator Kubernetes-Pods im Cluster Ihrer Umgebung. Mit Google Kubernetes Engine-Operatoren werden dagegen Kubernetes-Pods in einem bestimmten Cluster ausgeführt. Dabei kann es sich um einen separaten Cluster handeln, der nicht mit Ihrer Umgebung in Verbindung steht. Sie können Cluster auch mit Google Kubernetes Engine-Operatoren.
KubernetesPodOperator ist eine gute Option, wenn Sie Folgendes benötigen:
- Benutzerdefinierte Python-Abhängigkeiten, die nicht über das öffentliche PyPI-Repository verfügbar sind.
- Binäre Abhängigkeiten, die im Cloud Composer-Worker-Image nicht verfügbar sind.
Hinweise
Wenn Version 5.0.0 des CNCF Kubernetes-Anbieters verwendet wird, folgen Sie der Anleitung im Abschnitt zum CNCF Kubernetes-Anbieter.
Die Pod-Affinitätskonfiguration ist in Cloud Composer 2 nicht verfügbar. Wenn Sie möchten Um die Pod-Affinität zu nutzen, verwenden Sie die GKE-Operatoren, um Pods zu starten in einem anderen Cluster.
KubernetesPodOperator in Cloud Composer 2
In diesem Abschnitt wird beschrieben, wie der KubernetesPodOperator in Cloud Composer 2 funktioniert.
Ressourcennutzung
In Cloud Composer 2 wird der Cluster Ihrer Umgebung automatisch skaliert. Zusätzliche Arbeitslasten, die Sie ausführen mit KubernetesPodOperator unabhängig von Ihrer Umgebung skalieren.
Ihre Umgebung ist nicht von der erhöhten Ressourcennachfrage betroffen, aber der Cluster Ihrer Umgebung wird je nach Ressourcenbedarf skaliert.
Die Preise für die zusätzlichen Arbeitslasten, die Sie in Ihrem und der Cluster der Umgebung Cloud Composer 2-Preismodell und verwendet Compute-SKUs für Cloud Composer.
Cloud Composer 2 verwendet Autopilot-Cluster, die das Konzept einführen von Compute-Klassen:
Cloud Composer unterstützt nur die Compute-Klasse
general-purpose
.Wenn keine Klasse ausgewählt ist, wird standardmäßig die Klasse
general-purpose
verwendet, wenn Sie Pods mit KubernetesPodOperator erstellen.Jede Klasse ist mit bestimmten Eigenschaften und Ressourcenlimits verknüpft. Weitere Informationen finden Sie in der Autopilot-Dokumentation. Pods, die in der
general-purpose
-Klasse ausgeführt werden, können beispielsweise bis zu 110 GiB Arbeitsspeicher verbrauchen.
Zugriff auf Projektressourcen
Cloud Composer 2 verwendet GKE-Cluster mit
Identitätsföderation von Arbeitslasten für GKE Pods, die im Namespace composer-user-workloads
ausgeführt werden, können ohne zusätzliche Konfiguration auf Google Cloud-Ressourcen in Ihrem Projekt zugreifen. Dienstkonto Ihrer Umgebung
wird für den Zugriff auf diese Ressourcen verwendet.
Wenn Sie einen benutzerdefinierten Namensraum verwenden möchten, müssen die mit diesem Namensraum verknüpften Kubernetes-Dienstkonten Google Cloud-Dienstkonten zugeordnet werden, um die Autorisierung der Dienstidentität für Anfragen an Google APIs und andere Dienste zu ermöglichen. Wenn Sie Pods in einem benutzerdefinierten Namespace im Cluster Ihrer Umgebung ausführen, werden keine IAM-Bindungen zwischen Kubernetes- und Google Cloud-Dienstkonten erstellt. Diese Pods können dann nicht auf Ressourcen Ihres Google Cloud-Projekts zugreifen.
Wenn Sie einen benutzerdefinierten Namespace verwenden und möchten, dass Ihre Pods Zugriff auf Google Cloud-Ressourcen haben, folgen Sie der Anleitung unter „Workload Identity Federation for GKE“ und richten Sie die Bindungen für einen benutzerdefinierten Namespace ein:
- Erstellen Sie einen separaten Namespace im Cluster Ihrer Umgebung.
- Erstellen Sie eine Bindung zwischen dem Kubernetes-Dienstkonto des benutzerdefinierten Namespace und dem Dienstkonto Ihrer Umgebung.
- Fügen Sie dem Kubernetes-Dienstkonto die Dienstkontoannotation Ihrer Umgebung hinzu.
- Wenn Sie den KubernetesPodOperator verwenden, geben Sie den Namespace und den
Kubernetes-Dienstkonto in
namespace
undservice_account_name
Parameter.
Minimalkonfiguration
Zum Erstellen eines KubernetesPodOperator sind nur die Pod-Parameter name
, image
und task_id
erforderlich. Die /home/airflow/composer_kube_config
enthält Anmeldedaten für die Authentifizierung bei GKE.
Zusätzliche Konfiguration
In diesem Beispiel sind zusätzliche Parameter zu sehen, die Sie im KubernetesPodOperator konfigurieren können.
Weitere Informationen zu Parametern finden Sie in der Airflow-Referenz für KubernetesPodOperator. Informationen zur Verwendung von Kubernetes Secrets und ConfigMaps finden Sie unter Kubernetes Secrets und ConfigMaps verwenden. Für Informationen zur Verwendung von Jinja-Vorlagen mit KubernetesPodOperator finden Sie unter Jinja-Vorlagen verwenden
Jinja-Vorlagen verwenden
Airflow unterstützt Jinja-Vorlagen in DAGs.
Sie müssen die erforderlichen Airflow-Parameter (task_id
, name
und image
) mit dem Operator deklarieren. Wie im folgenden Beispiel gezeigt, können Sie alle anderen Parameter mit Jinja als Vorlage verwenden, einschließlich cmds
, arguments
, env_vars
und config_file
.
Der Parameter env_vars
in diesem Beispiel wird von einem
Airflow-Variable mit dem Namen my_value
. Beispiel-DAG
den Wert aus der Vorlagenvariable vars
in Airflow erhält. Airflow bietet mehr Variablen, die Zugriff auf verschiedene Arten von Informationen bieten. So können Sie beispielsweise mit der Vorlagenvariablen conf
auf Werte der Airflow-Konfigurationsoptionen zugreifen. Weitere Informationen und eine Liste der in Airflow verfügbaren Variablen finden Sie in der Airflow-Dokumentation unter Referenz zu Vorlagen.
Ohne den DAG zu ändern oder die Variable env_vars
zu erstellen,
Die Aufgabe ex-kube-templates
in diesem Beispiel schlägt fehl, weil die Variable
existieren. Erstellen Sie diese Variable in der Airflow-Benutzeroberfläche oder mit der Google Cloud CLI:
Airflow-UI
Rufen Sie die Airflow-UI auf.
Klicken Sie in der Symbolleiste auf Verwaltung > Variablen.
Klicken Sie auf der Seite Listenvariable auf Neuen Eintrag hinzufügen.
Geben Sie auf der Seite Add Variable (Variable hinzufügen) die folgenden Informationen ein:
- Key:
my_value
- Val:
example_value
- Key:
Klicken Sie auf Speichern.
gcloud
Geben Sie den folgenden Befehl ein:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Ersetzen Sie:
ENVIRONMENT
durch den Namen der Umgebung.LOCATION
durch die Region, in der sich die Umgebung befindet.
Das folgende Beispiel zeigt, wie Jinja-Vorlagen mit KubernetesPodOperator verwendet werden:
Kubernetes-Secrets und ConfigMaps verwenden
Ein Kubernetes-Secret ist ein Objekt, das sensible Daten enthält. Eine Kubernetes-ConfigMap ist ein Objekt, das nicht vertrauliche Daten in Schlüssel/Wert-Paaren enthält.
In Cloud Composer 2 können Sie Secrets und ConfigMaps mit folgendem Befehl erstellen: Google Cloud CLI, API oder Terraform verwenden und von dort aus auf KubernetesPodOperator.
YAML-Konfigurationsdateien
Wenn Sie ein Kubernetes-Secret oder ein ConfigMap mit der Google Cloud CLI und API erstellen, müssen Sie eine Datei im YAML-Format angeben. Diese Datei muss demselben Format entsprechen wie bei Kubernetes-Secrets und ConfigMaps. Die Kubernetes-Dokumentation enthält viele Codebeispiele für ConfigMaps und Secrets. Für den Einstieg können Sie sieh dir die Anmeldedaten mithilfe von Secrets sicher verteilen und ConfigMaps.
Wie bei Kubernetes-Secrets sollten Sie die Base64-Darstellung verwenden, wenn Sie Werte in Secrets definieren.
Um einen Wert zu codieren, können Sie den folgenden Befehl verwenden. Dies ist eine von vielen Möglichkeiten, um einen base64-codierten Wert zu erhalten):
echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64
Ausgabe:
cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
Die folgenden beiden YAML-Dateibeispiele werden später in diesem Leitfaden in Beispielen verwendet. Beispiel für eine YAML-Konfigurationsdatei für ein Kubernetes-Secret:
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
data:
sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
Ein weiteres Beispiel, das zeigt, wie Dateien eingebunden werden können. Wie im vorherigen Beispiel: Codieren Sie zuerst den Inhalt einer Datei (cat ./key.json | base64
) und geben Sie diesen Wert dann in der YAML-Datei an:
apiVersion: v1
kind: Secret
metadata:
name: service-account
data:
service-account.json: |
ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K
Beispiel für eine YAML-Konfigurationsdatei für eine ConfigMap. Sie brauchen die Base64- Darstellung in ConfigMaps:
apiVersion: v1
kind: ConfigMap
metadata:
name: example-configmap
data:
example_key: example_value
Kubernetes-Secrets verwalten
In Cloud Composer 2 erstellen Sie Secrets mit der Google Cloud CLI und kubectl
:
Rufen Sie Informationen zum Cluster Ihrer Umgebung ab:
Führen Sie dazu diesen Befehl aus:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
Ersetzen Sie:
ENVIRONMENT
durch den Namen Ihrer Umgebung.LOCATION
durch die Region, in der sich die Cloud Composer-Umgebung befindet.
Die Ausgabe dieses Befehls hat das folgende Format:
projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>
.Zum Abrufen der GKE-Cluster-ID kopieren Sie die Ausgabe nach
/clusters/
(endet auf-gke
).
Stellen Sie mit folgendem Befehl eine Verbindung zum GKE-Cluster her:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --region LOCATION
Ersetzen Sie Folgendes:
CLUSTER_ID
: die Cluster-ID der Umgebung.PROJECT_ID
: die Projekt-ID.LOCATION
: die Region, in der sich die Umgebung befindet.
Erstellen Sie Kubernetes-Secrets:
Die folgenden Befehle veranschaulichen zwei unterschiedliche Ansätze zum Erstellen von Kubernetes-Secrets. Beim
--from-literal
-Ansatz werden Schlüssel/Wert-Paare verwendet. Der Ansatz--from-file
verwendet Dateiinhalte.Führen Sie den Befehl folgenden Befehl. In diesem Beispiel wird ein Secret mit dem Namen
airflow-secrets
erstellt, das einsql_alchemy_conn
-Feld mit dem Werttest_value
enthält.kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
Führen Sie den folgenden Befehl aus, um ein Kubernetes-Secret durch Angabe des Dateiinhalts zu erstellen. In diesem Beispiel wird ein Secret namens
service-account
mit dem Feldservice-account.json
und dem -Wert, der aus dem Inhalt einer lokalen./key.json
-Datei entnommen wurde.kubectl create secret generic service-account \ --from-file service-account.json=./key.json -n composer-user-workloads
Kubernetes-Secrets in DAGs verwenden
In diesem Beispiel werden zwei Möglichkeiten zur Verwendung von Kubernetes Secrets gezeigt: als Umgebung und als vom Pod bereitgestelltes Volume.
Das erste Secret, airflow-secrets
, ist auf eine Kubernetes-Umgebungsvariable namens SQL_CONN
festgelegt (nicht auf eine Airflow- oder Cloud Composer-Umgebungsvariable).
Das zweite Secret, service-account
, stellt service-account.json
, eine Datei mit einem Dienstkontotoken, in /var/secrets/google
bereit.
Die Secret-Objekte sehen so aus:
Der Name des ersten Kubernetes-Secrets wird in der Variablen secret_env
definiert.
Dieses Secret heißt airflow-secrets
. Der Parameter deploy_type
gibt an, dass er als Umgebungsvariable verfügbar gemacht werden muss. Der Name der Umgebungsvariablen lautet SQL_CONN
, wie im Parameter deploy_target
angegeben. Die Funktion
Der Wert der Umgebungsvariable SQL_CONN
wird auf den Wert der
sql_alchemy_conn
-Taste.
Der Name des zweiten Kubernetes-Secrets wird in der Variablen secret_volume
definiert. Dieses Secret heißt service-account
. Sie wird als
Lautstärke, wie im Parameter deploy_type
angegeben. Der Pfad der bereitzustellenden Datei (deploy_target
) lautet /var/secrets/google
. Schließlich ist der key
der
Das im deploy_target
gespeicherte Secret ist service-account.json
.
Die Operatorkonfiguration sieht so aus:
Informationen zum CNCF-Kubernetes-Anbieter
KubernetesPodOperator ist im apache-airflow-providers-cncf-kubernetes
-Anbieter implementiert.
Detaillierte Versionshinweise für den CNCF-Kubernetes-Anbieter finden Sie unter Website des CNCF-Kubernetes-Anbieters
Version 6.0.0
In Version 6.0.0 des CNCF Kubernetes-Anbieterpakets wird die kubernetes_default
-Verbindung standardmäßig in KubernetesPodOperator verwendet.
Wenn Sie in Version 5.0.0 eine benutzerdefinierte Verbindung angegeben haben, wird diese vom Betreiber weiterhin verwendet. Wenn Sie wieder zur kubernetes_default
-Verbindung zurückkehren möchten, sollten Sie Ihre DAGs entsprechend anpassen.
Version 5.0.0
Diese Version führt im Vergleich zu Version 4.4.0 einige nicht abwärtskompatible Änderungen ein. Die wichtigsten beziehen sich auf
Die kubernetes_default
-Verbindung, die in Version 5.0.0 nicht verwendet wird.
- Die
kubernetes_default
-Verbindung muss geändert werden. Kubernetes-Konfiguration Pfad muss auf/home/airflow/composer_kube_config
festgelegt sein (wie in der folgenden Abbildung dargestellt). Alternativ mussconfig_file
der KubernetesPodOperator-Konfiguration hinzugefügt werden (wie in der im folgenden Codebeispiel).
- Ändern Sie den Code einer Aufgabe mit KubernetesPodOperator so:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Weitere Informationen zu Version 5.0.0 finden Sie unter Versionshinweise für CNCF-Kubernetes-Anbieter
Fehlerbehebung
In diesem Abschnitt finden Sie Tipps zur Behebung häufiger Probleme mit KubernetesPodOperator:
Logs ansehen
Prüfen Sie bei der Fehlerbehebung die Protokolle in der folgenden Reihenfolge:
Airflow-Aufgabenlogs:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab DAGs auf.
Klicken Sie auf den Namen des DAG und dann auf die DAG-Ausführung, um die Details und Protokolle aufzurufen.
Airflow-Planer-Logs:
Rufen Sie die Seite Umgebungsdetails auf.
Rufen Sie den Tab Protokolle auf.
Prüfen Sie die Logs des Airflow-Planers.
Pod-Logs in der Google Cloud Console unter GKE Arbeitsbelastungen. Diese Logs enthalten die YAML-Datei der Pod-Definition, Pod-Ereignisse und Pod-Details.
Rückgabecodes ungleich null
Wenn Sie den KubernetesPodOperator (und GKEStartPodOperator) verwenden, gibt den Einstiegspunkt des Containers an, ob die Aufgabe ob sie erfolgreich sind oder nicht. Rückgabecodes mit einem Wert ungleich null weisen auf einen Fehler hin.
Ein gängiges Muster besteht darin, ein Shell-Script als Container-Einstiegspunkt auszuführen, um mehrere Vorgänge innerhalb des Containers zusammenzufassen.
Wenn Sie ein solches Skript schreiben, empfehlen wir Ihnen, das
set -e
-Befehl am Anfang des Skripts, sodass fehlgeschlagene Befehle im Script
das Skript beenden und den Fehler an die Airflow-Taskinstanz weitergeben.
Pod-Zeitüberschreitungen
Das Standardzeitlimit für KubernetesPodOperator beträgt 120 Sekunden. Dies kann zu Zeitüberschreitungen führen, bevor größere Images heruntergeladen sind. Sie können das Zeitlimit erhöhen, wenn Sie beim Erstellen des KubernetesPodOperators den Parameter startup_timeout_seconds
ändern.
Wenn eine Pod-Zeitüberschreitung auftritt, ist das aufgabenspezifische Log in der Airflow-UI verfügbar. Beispiel:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Pod-Zeitüberschreitungen können auch auftreten, wenn das Cloud Composer-Dienstkonto nicht die erforderlichen IAM-Berechtigungen zum Ausführen der jeweiligen Aufgabe hat. Überprüfen Sie dies, indem Sie sich Fehler auf Pod-Ebene mithilfe der Methode GKE-Dashboards zum Ansehen der Logs für Ihre oder Cloud Logging verwenden.
Neue Verbindung konnte nicht hergestellt werden
Automatische Upgrades sind in GKE-Clustern standardmäßig aktiviert. Wenn sich ein Knotenpool in einem Cluster befindet, für den gerade ein Upgrade durchgeführt wird, sehen Sie möglicherweise die folgende Fehlermeldung:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Wenn Sie prüfen möchten, ob ein Upgrade des Clusters ausgeführt wird, rufen Sie in der Google Cloud Console die Kubernetes-Cluster und suchen Sie nach dem Ladesymbol neben der den Clusternamen der Umgebung.