Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Auf dieser Seite wird erläutert, wie Sie CeleryKubernetesExecutor in Cloud Composer aktivieren und KubernetesExecutor in Ihren DAGs verwenden.
Über CeleryKubernetesExecutor
CeleryKubernetesExecutor ist ein Executor-Typ, der CeleryExecutor und KubernetesExecutor gleichzeitig verwenden kann . Airflow wählt den Executor basierend auf der Warteschlange aus, die Sie für den . In einem DAG können Sie einige Aufgaben mit CeleryExecutor und andere mit KubernetesExecutor ausführen:
- CeleryExecutor ist für die schnelle und skalierbare Ausführung von Aufgaben optimiert.
- KubernetesExecutor ist für die Ausführung ressourcenintensiver Aufgaben und für die Ausführung von Aufgaben in Isolation konzipiert.
CeleryKubernetesExecutor in Cloud Composer
CeleryKubernetesExecutor in Cloud Composer bietet die Möglichkeit, KubernetesExecutor für Ihre Aufgaben verwenden. Es ist nicht möglich, KubernetesExecutor in Cloud Composer unabhängig von CeleryKubernetesExecutor zu verwenden.
Cloud Composer führt Aufgaben aus, die Sie mit KubernetesExecutor ausführen im Cluster Ihrer Umgebung im selben Namespace mit Airflow-Workern. Solche Aufgaben haben dieselben Bindungen wie Airflow-Arbeitskräfte und können auf Ressourcen in Ihrem Projekt zugreifen.
Aufgaben, die Sie mit KubernetesExecutor ausführen, verwenden die Methode Cloud Composer-Preismodell, da Pods mit diesen die im Cluster Ihrer Umgebung ausgeführt werden. Für diese Pods gelten die Cloud Composer Compute-SKUs (für CPU, Arbeitsspeicher und Speicher).
Wir empfehlen, Aufgaben mit dem CeleryExecutor auszuführen, wenn:
- Die Startzeit der Aufgabe ist wichtig.
- Aufgaben erfordern keine Laufzeitisolierung und sind nicht ressourcenintensiv.
Wir empfehlen, Aufgaben mit dem KubernetesExecutor auszuführen, wenn:
- Für Aufgaben ist eine Laufzeitisolierung erforderlich. Damit die Aufgaben beispielsweise nicht für Arbeitsspeicher und CPU, da sie in eigenen Pods ausgeführt werden.
- Für Aufgaben sind zusätzliche Systembibliotheken (oder PyPI-Pakete) erforderlich.
- Aufgaben sind ressourcenintensiv und Sie möchten steuern, welche CPU- und Arbeitsspeicherressourcen.
KubernetesExecutor im Vergleich zu KubernetesPodOperator
Das Ausführen von Aufgaben mit KubernetesExecutor ähnelt dem Ausführen von Aufgaben mit KubernetesPodOperator. Aufgaben werden ausgeführt in -Pods und ermöglichen so die Isolierung von Aufgaben auf Pod-Ebene und eine bessere Ressourcenverwaltung.
Es gibt jedoch einige wesentliche Unterschiede:
- KubernetesExecutor führt Aufgaben nur in der versionierten Cloud Composer-Version aus -Namespace Ihrer Umgebung. Dieser Name kann in Cloud Composer nicht geändert werden. Sie können einen Namespace angeben, in dem der KubernetesPodOperator Pod-Aufgaben ausführt.
- KubernetesExecutor kann jeden integrierten Airflow-Operator verwenden. KubernetesPodOperator führt nur ein vom Einstiegspunkt des Containers.
- KubernetesExecutor verwendet das standardmäßige Cloud Composer-Docker-Image mit denselben Python-, Airflow-Konfigurationsoptionsüberschreibungen, Umgebungsvariablen und PyPI-Paketen, die in Ihrer Cloud Composer-Umgebung definiert sind.
Docker-Images
Standardmäßig startet KubernetesExecutor Aufgaben mit demselben Docker-Image, das Cloud Composer für Celery-Worker verwendet. Dies ist die Cloud Composer-Image für Ihre Umgebung mit alle Änderungen, die Sie für Ihre Umgebung angegeben haben, z. B. benutzerdefiniertes PyPI Paketen oder Umgebungsvariablen.
Hinweise
Sie können CeleryKubernetesExecutor in Cloud Composer 3 verwenden.
Es kann kein anderer Executor als CeleryKubernetesExecutor verwendet werden in Cloud Composer 3. Das bedeutet, dass Sie Aufgaben mit CeleryExecutor, KubernetesExecutor oder beiden in einem DAG ausführen können. Es ist jedoch nicht möglich, Ihre Umgebung so zu konfigurieren, dass nur KubernetesExecutor oder CeleryExecutor verwendet wird.
CeleryKubernetesExecutor konfigurieren
Möglicherweise möchten Sie eine vorhandene Airflow-Konfiguration überschreiben. Optionen für KubernetesExecutor:
[kubernetes]worker_pods_creation_batch_size
Mit dieser Option wird die Anzahl der Aufrufe zum Erstellen von Kubernetes-Worker-Pods pro Planerschleife. Der Standardwert ist
1
. Es wird also nur ein Pod pro Scheduler-Herzschlag gestartet. Wenn Sie KubernetesExecutor intensiv verwenden, wird empfohlen, diesen Wert zu erhöhen.[kubernetes]worker_pods_pending_timeout
Mit dieser Option wird in Sekunden festgelegt, wie lange ein Worker im Status
Pending
(Pod wird erstellt) bleiben kann, bevor er als fehlgeschlagen eingestuft wird. Standardeinstellung Wert 5 Minuten beträgt.
Aufgaben mit KubernetesExecutor oder CeleryExecutor ausführen
Sie können Aufgaben mit CeleryExecutor, KubernetesExecutor oder beiden in einem DAG ausführen:
- Wenn Sie eine Aufgabe mit KubernetesExecutor ausführen möchten, geben Sie den Wert
kubernetes
im Parameterqueue
einer Aufgabe an. - Zum Ausführen einer Aufgabe mit CeleryExecutor lassen Sie den Parameter
queue
weg.
Im folgenden Beispiel wird die Aufgabe task-kubernetes
mit
KubernetesExecutor und die Aufgabe task-celery
mit CeleryExecutor:
import datetime
import airflow
from airflow.operators.python_operator import PythonOperator
with airflow.DAG(
"composer_sample_celery_kubernetes",
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@daily") as dag:
def kubernetes_example():
print("This task runs using KubernetesExecutor")
def celery_example():
print("This task runs using CeleryExecutor")
# To run with KubernetesExecutor, set queue to kubernetes
task_kubernetes = PythonOperator(
task_id='task-kubernetes',
python_callable=kubernetes_example,
dag=dag,
queue='kubernetes')
# To run with CeleryExecutor, omit the queue argument
task_celery = PythonOperator(
task_id='task-celery',
python_callable=celery_example,
dag=dag)
task_kubernetes >> task_celery
Airflow-Befehle für KubernetesExecutor ausführen
Sie können mehrere
Befehle der Airflow-Befehlszeile für KubernetesExecutor
mit gcloud
.
Spezifikation des Worker-Pods anpassen
Sie können die Worker-Pod-Spezifikation anpassen, indem Sie sie im executor_config
übergeben.
-Parameter einer Aufgabe. So können Sie benutzerdefinierte CPU- und Speicheranforderungen definieren.
Sie können die gesamte Worker-Pod-Spezifikation überschreiben, die zum Ausführen einer Aufgabe verwendet wird. Wenn Sie die Pod-Spezifikation einer Aufgabe abrufen möchten, die von KubernetesExecutor verwendet wird, können Sie den Airflow-Befehl kubernetes generate-dag-yaml
ausführen.
Weitere Informationen zum Anpassen der Worker-Pod-Spezifikationen finden Sie unter Airflow-Dokumentation
Das folgende Beispiel zeigt eine Aufgabe mit benutzerdefinierter Worker-Pod-Spezifikation:
PythonOperator(
task_id='custom-spec-example',
python_callable=f,
dag=dag,
queue='kubernetes',
executor_config={
'pod_override': k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name='base',
resources=k8s.V1ResourceRequirements(requests={
'cpu': '500m',
'memory': '1000Mi',
})
),
],
),
)
},
)
Aufgabenlogs ansehen
Logs von Aufgaben, die von KubernetesExecutor ausgeführt werden, sind im Tab Logs verfügbar. zusammen mit Logs der Aufgaben, die von CeleryExecutor ausgeführt werden:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Logs auf.
Gehen Sie zu Alle Protokolle > Airflow-Protokolle > Worker.
Worker mit dem Namen
airflow-k8s-worker
werden ausgeführt KubernetesExecutor-Aufgaben. Wenn Sie nach Protokollen für eine bestimmte Aufgabe suchen möchten, können Sie eine DAG-ID oder eine Aufgaben-ID als Suchbegriff verwenden.
Nächste Schritte
- Fehlerbehebung für KubernetesExecutor
- KubernetesPodOperator verwenden
- GKE-Operatoren verwenden
- Airflow-Konfigurationsoptionen überschreiben