Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Auf dieser Seite wird erläutert, wie Sie CeleryKubernetesExecutor in Cloud Composer und die Verwendung von KubernetesExecutor in Ihren DAGs.
Über CeleryKubernetesExecutor
CeleryKubernetesExecutor ist ein Executor-Typ, der CeleryExecutor und KubernetesExecutor gleichzeitig verwenden kann . Airflow wählt den Executor basierend auf der Warteschlange aus, die Sie für den . In einem DAG können Sie einige Aufgaben mit CeleryExecutor und andere Aufgaben ausführen. mit KubernetesExecutor:
- CeleryExecutor ist für die schnelle und skalierbare Ausführung von Aufgaben optimiert.
- KubernetesExecutor wurde für die Ausführung ressourcenintensiver Aufgaben und Aufgaben isoliert auszuführen.
CeleryKubernetesExecutor in Cloud Composer
CeleryKubernetesExecutor in Cloud Composer bietet die Möglichkeit, KubernetesExecutor für Ihre Aufgaben verwenden. Es ist nicht möglich, KubernetesExecutor in Cloud Composer getrennt von CeleryKubernetesExecutor.
Cloud Composer führt Aufgaben aus, die Sie mit KubernetesExecutor ausführen im Cluster Ihrer Umgebung im selben Namespace mit Airflow-Workern. Ein solches Aufgaben haben dieselben bindings wie Airflow und auf Ressourcen in Ihrem Projekt zugreifen können.
Aufgaben, die Sie mit KubernetesExecutor ausführen, verwenden die Methode Cloud Composer-Preismodell, da Pods mit diesen die im Cluster Ihrer Umgebung ausgeführt werden. Compute-SKUs für Cloud Composer (für CPU, Arbeitsspeicher und Speicher) gelten für diese Pods.
Wir empfehlen, Aufgaben mit CeleryExecutor auszuführen, wenn Folgendes zutrifft:
- Die Startzeit der Aufgabe ist wichtig.
- Aufgaben erfordern keine Laufzeitisolierung und sind nicht ressourcenintensiv.
Wir empfehlen, Aufgaben mit KubernetesExecutor auszuführen, wenn:
- Für Aufgaben ist eine Laufzeitisolierung erforderlich. Damit die Aufgaben beispielsweise nicht für Arbeitsspeicher und CPU, da sie in eigenen Pods ausgeführt werden.
- Für Aufgaben sind zusätzliche Systembibliotheken (oder PyPI-Pakete) erforderlich.
- Aufgaben sind ressourcenintensiv und Sie möchten steuern, welche CPU- und Arbeitsspeicherressourcen.
KubernetesExecutor im Vergleich zu KubernetesPodOperator
Das Ausführen von Aufgaben mit KubernetesExecutor ähnelt dem Aufgaben mit KubernetesPodOperator ausführen. Aufgaben werden ausgeführt in -Pods und ermöglichen so die Isolierung von Aufgaben auf Pod-Ebene und eine bessere Ressourcenverwaltung.
Es gibt jedoch einige wesentliche Unterschiede:
- KubernetesExecutor führt Aufgaben nur in der versionierten Cloud Composer-Version aus -Namespace Ihrer Umgebung. Dieser Namespace kann nicht geändert werden in Cloud Composer. Sie können einen Namespace angeben, in dem der KubernetesPodOperator Pod-Aufgaben ausführt.
- KubernetesExecutor kann jeden integrierten Airflow-Operator verwenden. KubernetesPodOperator führt nur ein vom Einstiegspunkt des Containers.
- KubernetesExecutor verwendet das standardmäßige Docker-Image von Cloud Composer mit demselben Python-Typ, Überschreibungen von Airflow-Konfigurationsoptionen, Variablen und PyPI-Pakete, die in Ihrem Cloud Composer-Umgebung.
Docker-Images
Standardmäßig startet KubernetesExecutor Aufgaben mit demselben Docker-Image, Cloud Composer für Celery-Worker. Dies ist die Cloud Composer-Image für Ihre Umgebung mit alle Änderungen, die Sie für Ihre Umgebung angegeben haben, z. B. benutzerdefiniertes PyPI Paketen oder Umgebungsvariablen.
Hinweise
Sie können CeleryKubernetesExecutor in Cloud Composer 3 verwenden.
Es kann kein anderer Executor als CeleryKubernetesExecutor verwendet werden in Cloud Composer 3. Sie können Aufgaben also mit CeleryExecutor, KubernetesExecutor oder beide in einem DAG, aber nicht können Sie Ihre Umgebung so konfigurieren, dass nur KubernetesExecutor oder CeleryExecutor.
CeleryKubernetesExecutor konfigurieren
Möglicherweise möchten Sie eine vorhandene Airflow-Konfiguration überschreiben. Optionen für KubernetesExecutor:
[kubernetes]worker_pods_creation_batch_size
Mit dieser Option wird die Anzahl der Aufrufe zum Erstellen von Kubernetes-Worker-Pods pro Planerschleife. Der Standardwert ist
1
, sodass nur ein einziger Pod gestartet wird. pro Planer-Heartbeat. Wenn Sie KubernetesExecutor intensiv verwenden, wird empfohlen, diesen Wert zu erhöhen.[kubernetes]worker_pods_pending_timeout
Mit dieser Option wird in Sekunden definiert, wie lange ein Worker im
Pending
bleiben darf (Pod wird erstellt), bevor er als fehlgeschlagen gilt. Standardeinstellung Wert 5 Minuten beträgt.
Aufgaben mit KubernetesExecutor oder CeleryExecutor ausführen
Sie können Aufgaben mit CeleryExecutor, KubernetesExecutor oder beiden in einem DAG ausführen:
- Geben Sie zum Ausführen einer Aufgabe mit KubernetesExecutor den Wert
kubernetes
in derqueue
-Parameter einer Aufgabe. - Zum Ausführen einer Aufgabe mit CeleryExecutor lassen Sie den Parameter
queue
weg.
Im folgenden Beispiel wird die Aufgabe task-kubernetes
mit
KubernetesExecutor und die Aufgabe task-celery
mit CeleryExecutor:
import datetime
import airflow
from airflow.operators.python_operator import PythonOperator
with airflow.DAG(
"composer_sample_celery_kubernetes",
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@daily") as dag:
def kubernetes_example():
print("This task runs using KubernetesExecutor")
def celery_example():
print("This task runs using CeleryExecutor")
# To run with KubernetesExecutor, set queue to kubernetes
task_kubernetes = PythonOperator(
task_id='task-kubernetes',
python_callable=kubernetes_example,
dag=dag,
queue='kubernetes')
# To run with CeleryExecutor, omit the queue argument
task_celery = PythonOperator(
task_id='task-celery',
python_callable=celery_example,
dag=dag)
task_kubernetes >> task_celery
Befehle der Airflow-Befehlszeile für KubernetesExecutor ausführen
Sie können mehrere
Befehle der Airflow-Befehlszeile für KubernetesExecutor
mit gcloud
.
Spezifikation des Worker-Pods anpassen
Sie können die Worker-Pod-Spezifikation anpassen, indem Sie sie im executor_config
übergeben.
-Parameter einer Aufgabe. Hiermit können Sie eine benutzerdefinierte CPU und einen benutzerdefinierten Arbeitsspeicher definieren
Anforderungen.
Sie können die gesamte Worker-Pod-Spezifikation überschreiben, die zum Ausführen einer Aufgabe verwendet wird. Bis
die Pod-Spezifikation einer von KubernetesExecutor verwendeten Aufgabe abrufen, können Sie
kubernetes generate-dag-yaml
Airflow-Befehlszeile ausführen
.
Weitere Informationen zum Anpassen der Worker-Pod-Spezifikationen finden Sie unter Airflow-Dokumentation
Das folgende Beispiel zeigt eine Aufgabe, die die Pod-Spezifikation für benutzerdefinierte Worker verwendet:
PythonOperator(
task_id='custom-spec-example',
python_callable=f,
dag=dag,
queue='kubernetes',
executor_config={
'pod_override': k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name='base',
resources=k8s.V1ResourceRequirements(requests={
'cpu': '500m',
'memory': '1000Mi',
})
),
],
),
)
},
)
Aufgabenlogs ansehen
Logs von Aufgaben, die von KubernetesExecutor ausgeführt werden, sind im Tab Logs verfügbar. zusammen mit Logs der Aufgaben, die von CeleryExecutor ausgeführt werden:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Logs auf.
Gehen Sie zu Alle Logs > Airflow-Logs. > Worker.
Worker mit dem Namen
airflow-k8s-worker
werden ausgeführt KubernetesExecutor-Aufgaben. Um nach Logs einer bestimmten Aufgabe zu suchen, können Sie DAG-ID oder Aufgaben-ID als Suchbegriff in der Suche verwenden
Nächste Schritte
- Fehlerbehebung bei KubernetesExecutor
- KubernetesPodOperator verwenden
- GKE-Operatoren verwenden
- Airflow-Konfigurationsoptionen überschreiben