CeleryKubernetesExecutor verwenden

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Auf dieser Seite wird erläutert, wie Sie CeleryKubernetesExecutor in Cloud Composer und die Verwendung von KubernetesExecutor in Ihren DAGs.

Über CeleryKubernetesExecutor

CeleryKubernetesExecutor ist ein Executor-Typ, der CeleryExecutor und KubernetesExecutor gleichzeitig verwenden kann . Airflow wählt den Executor basierend auf der Warteschlange aus, die Sie für den . In einem DAG können Sie einige Aufgaben mit CeleryExecutor und andere Aufgaben ausführen. mit KubernetesExecutor:

  • CeleryExecutor ist für die schnelle und skalierbare Ausführung von Aufgaben optimiert.
  • KubernetesExecutor wurde für die Ausführung ressourcenintensiver Aufgaben und Aufgaben isoliert auszuführen.

CeleryKubernetesExecutor in Cloud Composer

CeleryKubernetesExecutor in Cloud Composer bietet die Möglichkeit, KubernetesExecutor für Ihre Aufgaben verwenden. Es ist nicht möglich, KubernetesExecutor in Cloud Composer getrennt von CeleryKubernetesExecutor.

Cloud Composer führt Aufgaben aus, die Sie mit KubernetesExecutor ausführen im Cluster Ihrer Umgebung im selben Namespace mit Airflow-Workern. Ein solches Aufgaben haben dieselben bindings wie Airflow und auf Ressourcen in Ihrem Projekt zugreifen können.

Aufgaben, die Sie mit KubernetesExecutor ausführen, verwenden die Methode Cloud Composer-Preismodell, da Pods mit diesen die im Cluster Ihrer Umgebung ausgeführt werden. Compute-SKUs für Cloud Composer (für CPU, Arbeitsspeicher und Speicher) gelten für diese Pods.

Wir empfehlen, Aufgaben mit CeleryExecutor auszuführen, wenn Folgendes zutrifft:

  • Die Startzeit der Aufgabe ist wichtig.
  • Aufgaben erfordern keine Laufzeitisolierung und sind nicht ressourcenintensiv.

Wir empfehlen, Aufgaben mit KubernetesExecutor auszuführen, wenn:

  • Für Aufgaben ist eine Laufzeitisolierung erforderlich. Damit die Aufgaben beispielsweise nicht für Arbeitsspeicher und CPU, da sie in eigenen Pods ausgeführt werden.
  • Für Aufgaben sind zusätzliche Systembibliotheken (oder PyPI-Pakete) erforderlich.
  • Aufgaben sind ressourcenintensiv und Sie möchten steuern, welche CPU- und Arbeitsspeicherressourcen.

KubernetesExecutor im Vergleich zu KubernetesPodOperator

Das Ausführen von Aufgaben mit KubernetesExecutor ähnelt dem Aufgaben mit KubernetesPodOperator ausführen. Aufgaben werden ausgeführt in -Pods und ermöglichen so die Isolierung von Aufgaben auf Pod-Ebene und eine bessere Ressourcenverwaltung.

Es gibt jedoch einige wesentliche Unterschiede:

  • KubernetesExecutor führt Aufgaben nur in der versionierten Cloud Composer-Version aus -Namespace Ihrer Umgebung. Dieser Namespace kann nicht geändert werden in Cloud Composer. Sie können einen Namespace angeben, in dem der KubernetesPodOperator Pod-Aufgaben ausführt.
  • KubernetesExecutor kann jeden integrierten Airflow-Operator verwenden. KubernetesPodOperator führt nur ein vom Einstiegspunkt des Containers.
  • KubernetesExecutor verwendet das standardmäßige Docker-Image von Cloud Composer mit demselben Python-Typ, Überschreibungen von Airflow-Konfigurationsoptionen, Variablen und PyPI-Pakete, die in Ihrem Cloud Composer-Umgebung.

Docker-Images

Standardmäßig startet KubernetesExecutor Aufgaben mit demselben Docker-Image, Cloud Composer für Celery-Worker. Dies ist die Cloud Composer-Image für Ihre Umgebung mit alle Änderungen, die Sie für Ihre Umgebung angegeben haben, z. B. benutzerdefiniertes PyPI Paketen oder Umgebungsvariablen.

Hinweise

  • Sie können CeleryKubernetesExecutor in Cloud Composer 3 verwenden.

  • Es kann kein anderer Executor als CeleryKubernetesExecutor verwendet werden in Cloud Composer 3. Sie können Aufgaben also mit CeleryExecutor, KubernetesExecutor oder beide in einem DAG, aber nicht können Sie Ihre Umgebung so konfigurieren, dass nur KubernetesExecutor oder CeleryExecutor.

CeleryKubernetesExecutor konfigurieren

Möglicherweise möchten Sie eine vorhandene Airflow-Konfiguration überschreiben. Optionen für KubernetesExecutor:

  • [kubernetes]worker_pods_creation_batch_size

    Mit dieser Option wird die Anzahl der Aufrufe zum Erstellen von Kubernetes-Worker-Pods pro Planerschleife. Der Standardwert ist 1, sodass nur ein einziger Pod gestartet wird. pro Planer-Heartbeat. Wenn Sie KubernetesExecutor intensiv verwenden, wird empfohlen, diesen Wert zu erhöhen.

  • [kubernetes]worker_pods_pending_timeout

    Mit dieser Option wird in Sekunden definiert, wie lange ein Worker im Pending bleiben darf (Pod wird erstellt), bevor er als fehlgeschlagen gilt. Standardeinstellung Wert 5 Minuten beträgt.

Aufgaben mit KubernetesExecutor oder CeleryExecutor ausführen

Sie können Aufgaben mit CeleryExecutor, KubernetesExecutor oder beiden in einem DAG ausführen:

  • Geben Sie zum Ausführen einer Aufgabe mit KubernetesExecutor den Wert kubernetes in der queue-Parameter einer Aufgabe.
  • Zum Ausführen einer Aufgabe mit CeleryExecutor lassen Sie den Parameter queue weg.

Im folgenden Beispiel wird die Aufgabe task-kubernetes mit KubernetesExecutor und die Aufgabe task-celery mit CeleryExecutor:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

Befehle der Airflow-Befehlszeile für KubernetesExecutor ausführen

Sie können mehrere Befehle der Airflow-Befehlszeile für KubernetesExecutor mit gcloud.

Spezifikation des Worker-Pods anpassen

Sie können die Worker-Pod-Spezifikation anpassen, indem Sie sie im executor_config übergeben. -Parameter einer Aufgabe. Hiermit können Sie eine benutzerdefinierte CPU und einen benutzerdefinierten Arbeitsspeicher definieren Anforderungen.

Sie können die gesamte Worker-Pod-Spezifikation überschreiben, die zum Ausführen einer Aufgabe verwendet wird. Bis die Pod-Spezifikation einer von KubernetesExecutor verwendeten Aufgabe abrufen, können Sie kubernetes generate-dag-yaml Airflow-Befehlszeile ausführen .

Weitere Informationen zum Anpassen der Worker-Pod-Spezifikationen finden Sie unter Airflow-Dokumentation

Das folgende Beispiel zeigt eine Aufgabe, die die Pod-Spezifikation für benutzerdefinierte Worker verwendet:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '500m',
                            'memory': '1000Mi',
                        })
                    ),
                ],
            ),
        )
    },
)

Aufgabenlogs ansehen

Logs von Aufgaben, die von KubernetesExecutor ausgeführt werden, sind im Tab Logs verfügbar. zusammen mit Logs der Aufgaben, die von CeleryExecutor ausgeführt werden:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Logs auf.

  4. Gehen Sie zu Alle Logs > Airflow-Logs. > Worker.

  5. Worker mit dem Namen airflow-k8s-worker werden ausgeführt KubernetesExecutor-Aufgaben. Um nach Logs einer bestimmten Aufgabe zu suchen, können Sie DAG-ID oder Aufgaben-ID als Suchbegriff in der Suche verwenden

Nächste Schritte