Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Auf dieser Seite wird erläutert, wie Sie CeleryKubernetesExecutor in Cloud Composer aktivieren und KubernetesExecutor in Ihren DAGs verwenden.
Über CeleryKubernetesExecutor
CeleryKubernetesExecutor ist ein Executor-Typ, der CeleryExecutor und KubernetesExecutor gleichzeitig verwenden kann. Airflow wählt den Executor basierend auf der Warteschlange aus, die Sie für die Aufgabe definieren. In einem DAG können Sie einige Aufgaben mit CeleryExecutor und andere Aufgaben mit KubernetesExecutor ausführen:
- CeleryExecutor ist für die schnelle und skalierbare Ausführung von Aufgaben optimiert.
- KubernetesExecutor wurde für die Ausführung ressourcenintensiver Aufgaben und für die isolierte Ausführung von Aufgaben entwickelt.
CeleryKubernetesExecutor in Cloud Composer
CeleryKubernetesExecutor in Cloud Composer bietet die Möglichkeit, KubernetesExecutor für Ihre Aufgaben zu verwenden. KubernetesExecutor kann in Cloud Composer nicht separat von CeleryKubernetesExecutor verwendet werden.
Cloud Composer führt Aufgaben, die Sie mit KubernetesExecutor im Cluster Ihrer Umgebung ausführen, im selben Namespace mit Airflow-Workern aus. Diese Aufgaben haben dieselben Bindungen wie Airflow-Worker und können auf Ressourcen in Ihrem Projekt zugreifen.
Für Aufgaben, die Sie mit KubernetesExecutor ausführen, gilt das Cloud Composer-Preismodell, da Pods mit diesen Aufgaben im Cluster Ihrer Umgebung ausgeführt werden. Für diese Pods gelten die Compute-SKUs von Cloud Composer (für CPU, Arbeitsspeicher und Speicher).
Wir empfehlen, Aufgaben mit CeleryExecutor auszuführen, wenn Folgendes zutrifft:
- Die Startzeit der Aufgabe ist wichtig.
- Aufgaben erfordern keine Laufzeitisolierung und sind nicht ressourcenintensiv.
Wir empfehlen, Aufgaben mit KubernetesExecutor auszuführen, wenn:
- Aufgaben erfordern eine Laufzeitisolation. Beispielsweise damit Aufgaben nicht um Arbeitsspeicher und CPU konkurrieren, da sie in ihren eigenen Pods ausgeführt werden.
- Für Aufgaben sind zusätzliche Systembibliotheken (oder PyPI-Pakete) erforderlich.
- Aufgaben sind ressourcenintensiv und Sie möchten die verfügbaren CPU- und Arbeitsspeicherressourcen steuern.
KubernetesExecutor im Vergleich zu KubernetesPodOperator
Das Ausführen von Aufgaben mit KubernetesExecutor ähnelt dem Ausführen von Aufgaben mit KubernetesPodOperator. Aufgaben werden in Pods ausgeführt, wodurch Aufgaben auf Pod-Ebene isoliert und das Ressourcenmanagement verbessert wird.
Es gibt jedoch einige wesentliche Unterschiede:
- KubernetesExecutor führt Aufgaben nur im versionierten Cloud Composer-Namespace Ihrer Umgebung aus. Dieser Namespace kann in Cloud Composer nicht geändert werden. Sie können einen Namespace angeben, in dem der KubernetesPodOperator Pod-Aufgaben ausführt.
- KubernetesExecutor kann jeden integrierten Airflow-Operator verwenden. KubernetesPodOperator führt nur ein vom Einstiegspunkt des Containers definiertes Skript aus.
- KubernetesExecutor verwendet das standardmäßige Docker-Image von Cloud Composer mit den gleichen Python-, Airflow-Konfigurationsoptionen, Umgebungsvariablen und PyPI-Paketen, die in Ihrer Cloud Composer-Umgebung definiert sind.
Docker-Images
Standardmäßig startet KubernetesExecutor Aufgaben mit demselben Docker-Image, das Cloud Composer für Celery-Worker verwendet. Dies ist das Cloud Composer-Image für die Umgebung mit allen Änderungen, die Sie für die Umgebung angegeben haben, z. B. benutzerdefinierte PyPI-Pakete oder Umgebungsvariablen.
Hinweise
Sie können CeleryKubernetesExecutor in Cloud Composer 3 verwenden.
In Cloud Composer 3 kann nur CeleryKubernetesExecutor verwendet werden. Das bedeutet, dass Sie Aufgaben mit CeleryExecutor, KubernetesExecutor oder beiden in einem DAG ausführen können. Es ist jedoch nicht möglich, Ihre Umgebung so zu konfigurieren, dass nur KubernetesExecutor oder CeleryExecutor verwendet wird.
CeleryKubernetesExecutor konfigurieren
Möglicherweise möchten Sie vorhandene Airflow-Konfigurationsoptionen überschreiben, die sich auf KubernetesExecutor beziehen:
[kubernetes]worker_pods_creation_batch_size
Mit dieser Option wird die Anzahl der Aufrufe zum Erstellen von Kubernetes-Worker-Pods pro Planerschleife definiert. Der Standardwert ist
1
, sodass nur ein einzelner Pod pro Planer-Heartbeat gestartet wird. Wenn Sie KubernetesExecutor intensiv verwenden, wird empfohlen, diesen Wert zu erhöhen.[kubernetes]worker_pods_pending_timeout
Mit dieser Option wird in Sekunden definiert, wie lange ein Worker im Status
Pending
bleiben kann (Pod wird erstellt), bevor er als fehlgeschlagen gilt. Der Standardwert beträgt 5 Minuten.
Aufgaben mit KubernetesExecutor oder CeleryExecutor ausführen
Sie können Aufgaben mit CeleryExecutor, KubernetesExecutor oder beiden in einem DAG ausführen:
- Geben Sie zum Ausführen einer Aufgabe mit KubernetesExecutor den Wert
kubernetes
im Parameterqueue
einer Aufgabe an. - Zum Ausführen einer Aufgabe mit CeleryExecutor lassen Sie den Parameter
queue
weg.
Im folgenden Beispiel wird die Aufgabe task-kubernetes
mit KubernetesExecutor und die Aufgabe task-celery
mit CeleryExecutor ausgeführt:
import datetime
import airflow
from airflow.operators.python_operator import PythonOperator
with airflow.DAG(
"composer_sample_celery_kubernetes",
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@daily") as dag:
def kubernetes_example():
print("This task runs using KubernetesExecutor")
def celery_example():
print("This task runs using CeleryExecutor")
# To run with KubernetesExecutor, set queue to kubernetes
task_kubernetes = PythonOperator(
task_id='task-kubernetes',
python_callable=kubernetes_example,
dag=dag,
queue='kubernetes')
# To run with CeleryExecutor, omit the queue argument
task_celery = PythonOperator(
task_id='task-celery',
python_callable=celery_example,
dag=dag)
task_kubernetes >> task_celery
Befehle der Airflow-Befehlszeile für KubernetesExecutor ausführen
Mit gcloud
können Sie mehrere Airflow-Kommandozeilenbefehle für KubernetesExecutor ausführen.
Spezifikation des Worker-Pods anpassen
Sie können die Spezifikation des Worker-Pods anpassen, indem Sie sie im Parameter executor_config
einer Aufgabe übergeben. Damit können Sie benutzerdefinierte CPU- und Arbeitsspeicheranforderungen definieren.
Sie können die gesamte Worker-Pod-Spezifikation überschreiben, die zum Ausführen einer Aufgabe verwendet wird. Zum Abrufen der Pod-Spezifikation einer von KubernetesExecutor verwendeten Aufgabe können Sie den Airflow-Befehl kubernetes generate-dag-yaml
ausführen.
Weitere Informationen zum Anpassen der Worker-Pod-Spezifikationen finden Sie in der Airflow-Dokumentation.
Das folgende Beispiel zeigt eine Aufgabe, die die Pod-Spezifikation für benutzerdefinierte Worker verwendet:
PythonOperator(
task_id='custom-spec-example',
python_callable=f,
dag=dag,
queue='kubernetes',
executor_config={
'pod_override': k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name='base',
resources=k8s.V1ResourceRequirements(requests={
'cpu': '500m',
'memory': '1000Mi',
})
),
],
),
)
},
)
Aufgabenlogs ansehen
Logs der Aufgaben, die von KubernetesExecutor ausgeführt werden, sind auf dem Tab Logs verfügbar, zusammen mit Logs der Aufgaben, die von CeleryExecutor ausgeführt werden:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Logs auf.
Rufen Sie Alle Logs > Airflow-Logs > Worker auf.
Die Worker mit dem Namen
airflow-k8s-worker
führen KubernetesExecutor-Aufgaben aus. Wenn Sie nach Logs einer bestimmten Aufgabe suchen möchten, können Sie eine DAG-ID oder eine Aufgaben-ID als Suchbegriff in der Suche verwenden.
Nächste Schritte
- Fehlerbehebung bei KubernetesExecutor
- KubernetesPodOperator verwenden
- GKE-Operatoren verwenden
- Airflow-Konfigurationsoptionen überschreiben