CeleryKubernetesExecutor verwenden

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Auf dieser Seite wird erläutert, wie Sie CeleryKubernetesExecutor in Cloud Composer aktivieren und KubernetesExecutor in Ihren DAGs verwenden.

Über CeleryKubernetesExecutor

CeleryKubernetesExecutor ist ein Executor-Typ, der CeleryExecutor und KubernetesExecutor gleichzeitig verwenden kann. Airflow wählt den Executor basierend auf der Warteschlange aus, die Sie für die Aufgabe definieren. In einem DAG können Sie einige Aufgaben mit CeleryExecutor und andere Aufgaben mit KubernetesExecutor ausführen:

  • CeleryExecutor ist für die schnelle und skalierbare Ausführung von Aufgaben optimiert.
  • KubernetesExecutor wurde für die Ausführung ressourcenintensiver Aufgaben und für die isolierte Ausführung von Aufgaben entwickelt.

CeleryKubernetesExecutor in Cloud Composer

CeleryKubernetesExecutor in Cloud Composer bietet die Möglichkeit, KubernetesExecutor für Ihre Aufgaben zu verwenden. KubernetesExecutor kann in Cloud Composer nicht separat von CeleryKubernetesExecutor verwendet werden.

Cloud Composer führt Aufgaben, die Sie mit KubernetesExecutor im Cluster Ihrer Umgebung ausführen, im selben Namespace mit Airflow-Workern aus. Diese Aufgaben haben dieselben Bindungen wie Airflow-Worker und können auf Ressourcen in Ihrem Projekt zugreifen.

Für Aufgaben, die Sie mit KubernetesExecutor ausführen, gilt das Cloud Composer-Preismodell, da Pods mit diesen Aufgaben im Cluster Ihrer Umgebung ausgeführt werden. Für diese Pods gelten die Compute-SKUs von Cloud Composer (für CPU, Arbeitsspeicher und Speicher).

Wir empfehlen, Aufgaben mit CeleryExecutor auszuführen, wenn Folgendes zutrifft:

  • Die Startzeit der Aufgabe ist wichtig.
  • Aufgaben erfordern keine Laufzeitisolierung und sind nicht ressourcenintensiv.

Wir empfehlen, Aufgaben mit KubernetesExecutor auszuführen, wenn:

  • Aufgaben erfordern eine Laufzeitisolation. Beispielsweise damit Aufgaben nicht um Arbeitsspeicher und CPU konkurrieren, da sie in ihren eigenen Pods ausgeführt werden.
  • Für Aufgaben sind zusätzliche Systembibliotheken (oder PyPI-Pakete) erforderlich.
  • Aufgaben sind ressourcenintensiv und Sie möchten die verfügbaren CPU- und Arbeitsspeicherressourcen steuern.

KubernetesExecutor im Vergleich zu KubernetesPodOperator

Das Ausführen von Aufgaben mit KubernetesExecutor ähnelt dem Ausführen von Aufgaben mit KubernetesPodOperator. Aufgaben werden in Pods ausgeführt, wodurch Aufgaben auf Pod-Ebene isoliert und das Ressourcenmanagement verbessert wird.

Es gibt jedoch einige wesentliche Unterschiede:

  • KubernetesExecutor führt Aufgaben nur im versionierten Cloud Composer-Namespace Ihrer Umgebung aus. Dieser Namespace kann in Cloud Composer nicht geändert werden. Sie können einen Namespace angeben, in dem der KubernetesPodOperator Pod-Aufgaben ausführt.
  • KubernetesExecutor kann jeden integrierten Airflow-Operator verwenden. KubernetesPodOperator führt nur ein vom Einstiegspunkt des Containers definiertes Skript aus.
  • KubernetesExecutor verwendet das standardmäßige Docker-Image von Cloud Composer mit den gleichen Python-, Airflow-Konfigurationsoptionen, Umgebungsvariablen und PyPI-Paketen, die in Ihrer Cloud Composer-Umgebung definiert sind.

Docker-Images

Standardmäßig startet KubernetesExecutor Aufgaben mit demselben Docker-Image, das Cloud Composer für Celery-Worker verwendet. Dies ist das Cloud Composer-Image für die Umgebung mit allen Änderungen, die Sie für die Umgebung angegeben haben, z. B. benutzerdefinierte PyPI-Pakete oder Umgebungsvariablen.

Hinweise

  • Sie können CeleryKubernetesExecutor in Cloud Composer 3 verwenden.

  • In Cloud Composer 3 kann nur CeleryKubernetesExecutor verwendet werden. Das bedeutet, dass Sie Aufgaben mit CeleryExecutor, KubernetesExecutor oder beiden in einem DAG ausführen können. Es ist jedoch nicht möglich, Ihre Umgebung so zu konfigurieren, dass nur KubernetesExecutor oder CeleryExecutor verwendet wird.

CeleryKubernetesExecutor konfigurieren

Möglicherweise möchten Sie vorhandene Airflow-Konfigurationsoptionen überschreiben, die sich auf KubernetesExecutor beziehen:

  • [kubernetes]worker_pods_creation_batch_size

    Mit dieser Option wird die Anzahl der Aufrufe zum Erstellen von Kubernetes-Worker-Pods pro Planerschleife definiert. Der Standardwert ist 1, sodass nur ein einzelner Pod pro Planer-Heartbeat gestartet wird. Wenn Sie KubernetesExecutor intensiv verwenden, wird empfohlen, diesen Wert zu erhöhen.

  • [kubernetes]worker_pods_pending_timeout

    Mit dieser Option wird in Sekunden definiert, wie lange ein Worker im Status Pending bleiben kann (Pod wird erstellt), bevor er als fehlgeschlagen gilt. Der Standardwert beträgt 5 Minuten.

Aufgaben mit KubernetesExecutor oder CeleryExecutor ausführen

Sie können Aufgaben mit CeleryExecutor, KubernetesExecutor oder beiden in einem DAG ausführen:

  • Geben Sie zum Ausführen einer Aufgabe mit KubernetesExecutor den Wert kubernetes im Parameter queue einer Aufgabe an.
  • Zum Ausführen einer Aufgabe mit CeleryExecutor lassen Sie den Parameter queue weg.

Im folgenden Beispiel wird die Aufgabe task-kubernetes mit KubernetesExecutor und die Aufgabe task-celery mit CeleryExecutor ausgeführt:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

Befehle der Airflow-Befehlszeile für KubernetesExecutor ausführen

Mit gcloud können Sie mehrere Airflow-Kommandozeilenbefehle für KubernetesExecutor ausführen.

Spezifikation des Worker-Pods anpassen

Sie können die Spezifikation des Worker-Pods anpassen, indem Sie sie im Parameter executor_config einer Aufgabe übergeben. Damit können Sie benutzerdefinierte CPU- und Arbeitsspeicheranforderungen definieren.

Sie können die gesamte Worker-Pod-Spezifikation überschreiben, die zum Ausführen einer Aufgabe verwendet wird. Zum Abrufen der Pod-Spezifikation einer von KubernetesExecutor verwendeten Aufgabe können Sie den Airflow-Befehl kubernetes generate-dag-yaml ausführen.

Weitere Informationen zum Anpassen der Worker-Pod-Spezifikationen finden Sie in der Airflow-Dokumentation.

Das folgende Beispiel zeigt eine Aufgabe, die die Pod-Spezifikation für benutzerdefinierte Worker verwendet:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '500m',
                            'memory': '1000Mi',
                        })
                    ),
                ],
            ),
        )
    },
)

Aufgabenlogs ansehen

Logs der Aufgaben, die von KubernetesExecutor ausgeführt werden, sind auf dem Tab Logs verfügbar, zusammen mit Logs der Aufgaben, die von CeleryExecutor ausgeführt werden:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Logs auf.

  4. Rufen Sie Alle Logs > Airflow-Logs > Worker auf.

  5. Die Worker mit dem Namen airflow-k8s-worker führen KubernetesExecutor-Aufgaben aus. Wenn Sie nach Logs einer bestimmten Aufgabe suchen möchten, können Sie eine DAG-ID oder eine Aufgaben-ID als Suchbegriff in der Suche verwenden.

Nächste Schritte