CeleryKubernetesExecutor verwenden

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Auf dieser Seite wird erläutert, wie Sie CeleryKubernetesExecutor in Cloud Composer aktivieren und KubernetesExecutor in Ihren DAGs verwenden.

CeleryKubernetesExecutor

CeleryKubernetesExecutor ist ein Executor, der CeleryExecutor und KubernetesExecutor gleichzeitig verwenden kann. Airflow wählt den Executor anhand der Warteschlange aus, die Sie für die Aufgabe definieren. In einem DAG können Sie einige Aufgaben mit CeleryExecutor und andere Aufgaben mit KubernetesExecutor ausführen:

  • CeleryExecutor ist für die schnelle und skalierbare Ausführung von Aufgaben optimiert.
  • KubernetesExecutor ist für die Ausführung ressourcenintensiver Aufgaben und für die Ausführung von Aufgaben in Isolation konzipiert.

CeleryKubernetesExecutor in Cloud Composer

Mit CeleryKubernetesExecutor in Cloud Composer können Sie KubernetesExecutor für Ihre Aufgaben verwenden. Es ist nicht möglich, KubernetesExecutor in Cloud Composer unabhängig von CeleryKubernetesExecutor zu verwenden.

Cloud Composer führt Aufgaben aus, die Sie mit KubernetesExecutor im Cluster Ihrer Umgebung ausführen, im selben Namespace wie Airflow-Worker. Solche Aufgaben haben dieselben Bindungen wie Airflow-Arbeitskräfte und können auf Ressourcen in Ihrem Projekt zugreifen.

Für Aufgaben, die Sie mit KubernetesExecutor ausführen, gilt das Cloud Composer-Preismodell, da Pods mit diesen Aufgaben im Cluster Ihrer Umgebung ausgeführt werden. Für diese Pods gelten die Cloud Composer Compute-SKUs (für CPU, Arbeitsspeicher und Speicher).

Wir empfehlen, Aufgaben mit dem CeleryExecutor auszuführen, wenn:

  • Die Startzeit von Aufgaben ist wichtig.
  • Aufgaben erfordern keine Laufzeitisolierung und sind nicht ressourcenintensiv.

Wir empfehlen, Aufgaben mit dem KubernetesExecutor auszuführen, wenn:

  • Für Aufgaben ist eine Laufzeitisolierung erforderlich. So können Aufgaben beispielsweise nicht um Arbeitsspeicher und CPU konkurrieren, da sie in eigenen Pods ausgeführt werden.
  • Aufgaben sind ressourcenintensiv und Sie möchten die verfügbaren CPU- und Arbeitsspeicherressourcen steuern.

KubernetesExecutor im Vergleich zu KubernetesPodOperator

Das Ausführen von Aufgaben mit KubernetesExecutor ähnelt dem Ausführen von Aufgaben mit KubernetesPodOperator. Aufgaben werden in Pods ausgeführt, was eine Aufgabenisolierung auf Pod-Ebene und eine bessere Ressourcenverwaltung ermöglicht.

Es gibt jedoch einige wichtige Unterschiede:

  • KubernetesExecutor führt Aufgaben nur im versionierten Cloud Composer-Namespace Ihrer Umgebung aus. Dieser Name kann in Cloud Composer nicht geändert werden. Sie können einen Namespace angeben, in dem KubernetesPodOperator Pod-Aufgaben ausführt.
  • KubernetesExecutor kann jeden integrierten Airflow-Operator verwenden. KubernetesPodOperator führt nur ein bereitgestelltes Script aus, das durch den Einstiegspunkt des Containers definiert ist.
  • KubernetesExecutor verwendet das standardmäßige Cloud Composer-Docker-Image mit denselben Python-, Airflow-Konfigurationsoptionsüberschreibungen, Umgebungsvariablen und PyPI-Paketen, die in Ihrer Cloud Composer-Umgebung definiert sind.

Docker-Images

Standardmäßig startet KubernetesExecutor Aufgaben mit demselben Docker-Image, das Cloud Composer für Celery-Worker verwendet. Das ist das Cloud Composer-Image für Ihre Umgebung mit allen Änderungen, die Sie für Ihre Umgebung angegeben haben, z. B. benutzerdefinierte PyPI-Pakete oder Umgebungsvariablen.

Hinweis

  • Sie können CeleryKubernetesExecutor in Cloud Composer 3 verwenden.

  • In Cloud Composer 3 kann nur CeleryKubernetesExecutor verwendet werden. Das bedeutet, dass Sie Aufgaben mit CeleryExecutor, KubernetesExecutor oder beiden in einem DAG ausführen können. Es ist jedoch nicht möglich, Ihre Umgebung so zu konfigurieren, dass nur KubernetesExecutor oder CeleryExecutor verwendet wird.

CeleryKubernetesExecutor konfigurieren

Sie können vorhandene Airflow-Konfigurationsoptionen, die sich auf KubernetesExecutor beziehen, überschreiben:

  • [kubernetes]worker_pods_creation_batch_size

    Mit dieser Option wird die Anzahl der Aufrufe zum Erstellen von Kubernetes-Worker-Pods pro Scheduler-Schleife definiert. Der Standardwert ist 1. Es wird also nur ein Pod pro Scheduler-Herzschlag gestartet. Wenn Sie KubernetesExecutor häufig verwenden, sollten Sie diesen Wert erhöhen.

  • [kubernetes]worker_pods_pending_timeout

    Mit dieser Option wird in Sekunden festgelegt, wie lange ein Worker im Status Pending (Pod wird erstellt) bleiben kann, bevor er als fehlgeschlagen eingestuft wird. Der Standardwert ist 5 Minuten.

Aufgaben mit KubernetesExecutor oder CeleryExecutor ausführen

Sie können Aufgaben mit CeleryExecutor, KubernetesExecutor oder beiden in einem DAG ausführen:

  • Wenn Sie eine Aufgabe mit KubernetesExecutor ausführen möchten, geben Sie den Wert kubernetes im Parameter queue einer Aufgabe an.
  • Wenn Sie eine Aufgabe mit CeleryExecutor ausführen möchten, lassen Sie den Parameter queue weg.

Im folgenden Beispiel wird die Aufgabe task-kubernetes mit KubernetesExecutor und die Aufgabe task-celery mit CeleryExecutor ausgeführt:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

Airflow-Befehle für KubernetesExecutor ausführen

Mit gcloud können Sie mehrere Airflow-Befehle für KubernetesExecutor ausführen.

Worker-Pod-Spezifikation anpassen

Sie können die Worker-Pod-Spezifikation anpassen, indem Sie sie im executor_config-Parameter einer Aufgabe übergeben. So können Sie benutzerdefinierte CPU- und Speicheranforderungen definieren.

Sie können die gesamte Worker-Pod-Spezifikation überschreiben, die zum Ausführen einer Aufgabe verwendet wird. Wenn Sie die Pod-Spezifikation einer Aufgabe abrufen möchten, die von KubernetesExecutor verwendet wird, können Sie den Airflow-Befehl kubernetes generate-dag-yaml ausführen.

Weitere Informationen zum Anpassen der Pod-Spezifikation für Worker finden Sie in der Airflow-Dokumentation.

Cloud Composer 3 unterstützt die folgenden Werte für die Ressourcenanforderungen:

Ressource Minimum Maximum Schritt
CPU 0,25 32 Schrittwerte: 0,25, 0,5, 1, 2, 4, 6, 8, 10, …, 32. Angeforderte Werte werden auf den nächsten unterstützten Schrittwert aufgerundet (z. B. von 5 auf 6).
Speicher 2G (GB) 128 G (GB) Schrittwerte: 2, 3, 4, 5, …, 128. Angeforderte Werte werden auf den nächsten unterstützten Schrittwert aufgerundet (z. B. 3, 5G auf 4G).
Speicher - 100 G (GB) Beliebiger Wert. Wenn mehr als 100 GB angefordert werden, werden nur 100 GB bereitgestellt.

Weitere Informationen zu Ressourceneinheiten in Kubernetes finden Sie unter Ressourceneinheiten in Kubernetes.

Das folgende Beispiel zeigt eine Aufgabe mit benutzerdefinierter Worker-Pod-Spezifikation:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '0.5',
                            'memory': '2G',
                        })
                    ),
                ],
            ),
        )
    },
)

Aufgabenlogs ansehen

Logs der von KubernetesExecutor ausgeführten Aufgaben sind auf dem Tab Logs zusammen mit Logs der von CeleryExecutor ausgeführten Aufgaben verfügbar:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Protokolle auf.

  4. Gehen Sie zu Alle Protokolle > Airflow-Protokolle > Worker.

  5. Worker mit dem Namen airflow-k8s-worker führen KubernetesExecutor-Aufgaben aus. Wenn Sie nach Protokollen für eine bestimmte Aufgabe suchen möchten, können Sie eine DAG-ID oder eine Aufgaben-ID als Suchbegriff verwenden.

Nächste Schritte