CeleryKubernetesExecutor verwenden

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Auf dieser Seite wird erläutert, wie Sie CeleryKubernetesExecutor in Cloud Composer aktivieren und KubernetesExecutor in Ihren DAGs verwenden.

CeleryKubernetesExecutor

CeleryKubernetesExecutor ist ein Executortyp, der CeleryExecutor und KubernetesExecutor gleichzeitig verwenden kann. Airflow wählt den Executor basierend auf der Warteschlange aus, die Sie für die Aufgabe definieren. In einem DAG können Sie einige Aufgaben mit CeleryExecutor und andere Aufgaben mit KubernetesExecutor ausführen:

  • CeleryExecutor ist für die schnelle und skalierbare Ausführung von Aufgaben optimiert.
  • KubernetesExecutor ist für die Ausführung ressourcenintensiver Aufgaben und die isolierte Ausführung von Aufgaben konzipiert.

CeleryKubernetesExecutor in Cloud Composer

Mit CeleryKubernetesExecutor in Cloud Composer können Sie KubernetesExecutor für Ihre Aufgaben verwenden. Es ist nicht möglich, KubernetesExecutor in Cloud Composer separat von CeleryKubernetesExecutor zu verwenden.

Cloud Composer führt Aufgaben, die Sie mit KubernetesExecutor ausführen, im Cluster Ihrer Umgebung im selben Namespace wie Airflow-Worker aus. Solche Aufgaben haben dieselben Bindungen wie Airflow-Worker und können auf Ressourcen in Ihrem Projekt zugreifen.

Für Aufgaben, die Sie mit KubernetesExecutor ausführen, gilt das Cloud Composer-Preismodell, da Pods mit diesen Aufgaben im Cluster Ihrer Umgebung ausgeführt werden. Für diese Pods gelten die Cloud Composer Compute-Artikelnummern (für CPU, Arbeitsspeicher und Speicher).

Wir empfehlen, Aufgaben mit dem CeleryExecutor auszuführen, wenn:

  • Die Startzeit von Aufgaben ist wichtig.
  • Aufgaben erfordern keine Laufzeitisolierung und sind nicht ressourcenintensiv.

Wir empfehlen, Aufgaben mit dem KubernetesExecutor auszuführen, wenn:

  • Für Aufgaben ist eine Laufzeitisolierung erforderlich. So konkurrieren Aufgaben beispielsweise nicht um Arbeitsspeicher und CPU, da sie in eigenen Pods ausgeführt werden.
  • Aufgaben sind ressourcenintensiv und Sie möchten die verfügbaren CPU- und Arbeitsspeicherressourcen steuern.

KubernetesExecutor im Vergleich zu KubernetesPodOperator

Das Ausführen von Aufgaben mit KubernetesExecutor ähnelt dem Ausführen von Aufgaben mit KubernetesPodOperator. Aufgaben werden in Pods ausgeführt. So wird die Aufgabenisolation auf Pod-Ebene ermöglicht und die Ressourcenverwaltung verbessert.

Es gibt jedoch einige wichtige Unterschiede:

  • Der KubernetesExecutor führt Aufgaben nur im versionierten Cloud Composer-Namespace Ihrer Umgebung aus. Dieser Namespace kann in Cloud Composer nicht geändert werden. Sie können einen Namespace angeben, in dem KubernetesPodOperator Pod-Aufgaben ausführt.
  • KubernetesExecutor kann jeden integrierten Airflow-Operator verwenden. KubernetesPodOperator führt nur ein bereitgestelltes Skript aus, das durch den Einstiegspunkt des Containers definiert wird.
  • KubernetesExecutor verwendet das Standard-Cloud Composer-Docker-Image mit derselben Python-, Airflow-Konfigurationsoptionsüberschreibungen, Umgebungsvariablen und PyPI-Paketen, die in Ihrer Cloud Composer-Umgebung definiert sind.

Docker-Images

Standardmäßig startet KubernetesExecutor Aufgaben mit demselben Docker-Image, das Cloud Composer für Celery-Worker verwendet. Dies ist das Cloud Composer-Image für Ihre Umgebung mit allen Änderungen, die Sie für Ihre Umgebung angegeben haben, z. B. benutzerdefinierte PyPI-Pakete oder Umgebungsvariablen.

Hinweise

  • Sie können CeleryKubernetesExecutor in Cloud Composer 3 verwenden.

  • In Cloud Composer 3 ist es nicht möglich, einen anderen Executor als CeleryKubernetesExecutor zu verwenden. Das bedeutet, dass Sie Aufgaben mit CeleryExecutor, KubernetesExecutor oder beiden in einem DAG ausführen können. Es ist jedoch nicht möglich, Ihre Umgebung so zu konfigurieren, dass nur KubernetesExecutor oder CeleryExecutor verwendet wird.

CeleryKubernetesExecutor konfigurieren

Möglicherweise möchten Sie vorhandene Airflow-Konfigurationsoptionen, die mit KubernetesExecutor zusammenhängen, überschreiben:

  • [kubernetes]worker_pods_creation_batch_size

    Diese Option definiert die Anzahl der Aufrufe zum Erstellen von Kubernetes-Worker-Pods pro Scheduler-Schleife. Der Standardwert ist 1. Daher wird pro Scheduler-Heartbeat nur ein Pod gestartet. Wenn Sie KubernetesExecutor häufig verwenden, empfehlen wir, diesen Wert zu erhöhen.

  • [kubernetes]worker_pods_pending_timeout

    Mit dieser Option wird in Sekunden festgelegt, wie lange sich ein Worker im Status Pending (Pod wird erstellt) befinden darf, bevor er als fehlgeschlagen gilt. Der Standardwert ist 5 Minuten.

Aufgaben mit KubernetesExecutor oder CeleryExecutor ausführen

Sie können Aufgaben mit CeleryExecutor, KubernetesExecutor oder beiden in einem DAG ausführen:

  • Wenn Sie eine Aufgabe mit KubernetesExecutor ausführen möchten, geben Sie den Wert kubernetes im Parameter queue einer Aufgabe an.
  • Wenn Sie eine Aufgabe mit CeleryExecutor ausführen möchten, lassen Sie den Parameter queue weg.

Im folgenden Beispiel wird der Task task-kubernetes mit KubernetesExecutor und der Task task-celery mit CeleryExecutor ausgeführt:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

Befehle der Airflow-Befehlszeile für KubernetesExecutor ausführen

Sie können mehrere Airflow-Befehlszeilenbefehle für KubernetesExecutor mit gcloud ausführen.

Worker-Pod-Spezifikation anpassen

Sie können die Spezifikation des Worker-Pods anpassen, indem Sie sie im Parameter executor_config einer Aufgabe übergeben. Damit können Sie benutzerdefinierte CPU- und Speicheranforderungen definieren.

Sie können die gesamte Worker-Pod-Spezifikation überschreiben, die zum Ausführen einer Aufgabe verwendet wird. Wenn Sie die Pod-Spezifikation eines Tasks abrufen möchten, der von KubernetesExecutor verwendet wird, können Sie den Airflow-CLI-Befehl kubernetes generate-dag-yaml ausführen.

Weitere Informationen zum Anpassen der Worker-Pod-Spezifikation finden Sie in der Airflow-Dokumentation.

Cloud Composer 3 unterstützt die folgenden Werte für Ressourcenanforderungen:

Ressource Minimum Maximum Schritt
CPU 0,25 32 Schrittwerte: 0,25, 0,5, 1, 2, 4, 6, 8, 10, …, 32. Angefragte Werte werden auf den nächstgelegenen unterstützten Schrittwert aufgerundet (z. B. 5 auf 6).
Arbeitsspeicher 2G (GB) 128 GB Schrittwerte: 2, 3, 4, 5, …, 128. Angefragte Werte werden auf den nächstgelegenen unterstützten Schrittwert aufgerundet (z. B. 3, 5G auf 4G).
Speicher - 100 GB Beliebiger Wert. Wenn mehr als 100 GB angefordert werden, werden nur 100 GB bereitgestellt.

Weitere Informationen zu Ressourceneinheiten in Kubernetes finden Sie unter Ressourceneinheiten in Kubernetes.

Das folgende Beispiel zeigt eine Aufgabe, die eine benutzerdefinierte Worker-Pod-Spezifikation verwendet:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '0.5',
                            'memory': '2G',
                        })
                    ),
                ],
            ),
        )
    },
)

Aufgabenlogs ansehen

Logs von Aufgaben, die von KubernetesExecutor ausgeführt werden, sind auf dem Tab Logs verfügbar, zusammen mit Logs von Aufgaben, die von CeleryExecutor ausgeführt werden:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Logs auf.

  4. Rufen Sie Alle Logs > Airflow-Logs > Worker auf.

  5. Worker mit dem Namen airflow-k8s-worker führen KubernetesExecutor-Aufgaben aus. Wenn Sie nach Logs einer bestimmten Aufgabe suchen möchten, können Sie eine DAG-ID oder eine Aufgaben-ID als Keyword in der Suche verwenden.

Nächste Schritte