Apache Airflow-DAG in Cloud Composer 1 (Google Cloud CLI) ausführen

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

In dieser Kurzanleitung erfahren Sie, wie Sie eine Cloud Composer-Umgebung erstellen. und führen Sie einen Apache Airflow-DAG in Cloud Composer 1 aus.

Hinweise

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Installieren Sie die Google Cloud CLI.
  3. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  6. Installieren Sie die Google Cloud CLI.
  7. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  8. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  9. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  10. Aktivieren Sie die Cloud Composer API:

    gcloud services enable composer.googleapis.com
  11. Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Ausführen der Kurzanleitung benötigen:

    Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff verwalten.

    Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Umgebung erstellen

Erstellen Sie im us-central1 eine neue Umgebung mit dem Namen example-environment. Region mit der neuesten Version von Cloud Composer 1.

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-1.20.12-airflow-1.10.15

DAG-Datei erstellen

Ein Airflow-DAG ist eine Sammlung organisierter Aufgaben, die Sie planen und ausführen möchten. DAGs werden in Standard-Python-Dateien definiert.

In dieser Anleitung wird ein Beispiel für einen Airflow-DAG verwendet, der in der Datei quickstart.py definiert ist. Der Python-Code in dieser Datei führt Folgendes aus:

  1. Erstellt den DAG composer_sample_dag. Dieser DAG wird täglich ausgeführt.
  2. Die Aufgabe print_dag_run_conf wird ausgeführt. Sie gibt mithilfe des bash-Operators die Konfiguration der DAG-Ausführung aus.

Speichern Sie eine Kopie der Datei quickstart.py auf Ihrem lokalen Computer:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

DAG-Datei in den Bucket Ihrer Umgebung hochladen

Jede Cloud Composer-Umgebung hat ein Cloud Storage- Bucket verknüpft. Airflow nur in Cloud Composer-Zeitplänen DAGs, die sich in diesem Bucket im Ordner /dags befinden.

Laden Sie zum Planen des DAG quickstart.py von Ihrem lokalen Computer auf den Ordner /dags der Umgebung:

Führen Sie den folgenden Befehl aus, um quickstart.py mit der Google Cloud CLI hochzuladen: den Ordner, in dem sich die Datei quickstart.py befindet:

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

DAG aufrufen

Nach dem Hochladen der DAG-Datei führt Airflow die folgenden Schritte aus:

  1. Analysiert die von Ihnen hochgeladene DAG-Datei. Es kann einige Minuten dauern, DAG, der für Airflow verfügbar gemacht wird.
  2. Fügt den DAG der Liste der verfügbaren DAGs hinzu.
  3. Führt den DAG gemäß dem Zeitplan aus, den Sie in der DAG-Datei angegeben haben.

Prüfen Sie, ob Ihr DAG fehlerfrei verarbeitet wird und in Airflow verfügbar ist, indem Sie in der DAG-UI ansehen. DAG-UI ist die Cloud Composer-Oberfläche zum Ansehen DAG-Informationen in der Google Cloud Console Cloud Composer bietet auch Zugriff auf die Airflow-UI, ein natives Airflow-Web .

  1. Warten Sie etwa fünf Minuten, damit Airflow die DAG-Datei verarbeitet hat die Sie zuvor hochgeladen haben, und um die erste DAG-Ausführung abzuschließen (Erläuterung später).

  2. Führen Sie in der Google Cloud CLI den folgenden Befehl aus. Dieser Befehl führt die Methode dags list Befehl der Airflow-Befehlszeile zum Auflisten von DAGs in Ihren zu verbessern.

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. Prüfen Sie, ob der DAG composer_quickstart in der Ausgabe des Befehls aufgeführt ist.

    Beispielausgabe:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

Details zur DAG-Ausführung ansehen

Eine einzelne Ausführung eines DAG wird als DAG-Ausführung bezeichnet. Airflow sofort führt eine DAG-Ausführung für den Beispiel-DAG aus, da das Startdatum in der DAG-Datei gestern festgelegt. So greift Airflow den angegebenen DAGs auf ein.

Der Beispiel-DAG enthält die Aufgabe print_dag_run_conf, die den echo ausführt. in der Konsole. Dieser Befehl gibt Metainformationen zum DAG aus (Numerische Kennung der DAG-Ausführung).

Führen Sie in der Google Cloud CLI den folgenden Befehl aus. Mit diesem Befehl werden DAG-Ausführungen aufgelistet für den DAG composer_quickstart:

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

Beispielausgabe:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

Die Airflow-Befehlszeile bietet keinen Befehl zum Ansehen von Aufgabenlogs. Sie können andere Methoden zum Anzeigen von Airflow-Aufgabenlogs: Cloud Composer-DAG-UI, Airflow-UI oder Cloud Logging Dieser Leitfaden zeigt eine Möglichkeit zum Abfragen von Logs aus einer bestimmten DAG-Ausführung in Cloud Logging.

Führen Sie in der Google Cloud CLI den folgenden Befehl aus. Dieser Befehl liest Logs aus Cloud Logging für eine bestimmte DAG-Ausführung des DAG composer_quickstart. Die Das Argument --format formatiert die Ausgabe so, dass nur der Text der Lognachricht angezeigt wird.

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

Ersetzen Sie:

  • RUN_ID durch den Wert run_id aus der Ausgabe des tasks states-for-dag-run-Befehl, den Sie zuvor ausgeführt haben. Beispiel: 2024-02-17T15:38:38.969307+00:00.

Beispielausgabe:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

Bereinigen

Löschen Sie das Google Cloud-Projekt mit den Ressourcen, damit Ihrem Google Cloud-Konto die auf dieser Seite verwendeten Ressourcen nicht in Rechnung gestellt werden.

Löschen Sie die in dieser Anleitung verwendeten Ressourcen:

  1. Löschen Sie die Cloud Composer-Umgebung.

    1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

      Zur Seite Umgebungen

    2. Wählen Sie example-environment aus und klicken Sie auf Löschen.

    3. Warten Sie, bis die Umgebung gelöscht ist.

  2. Löschen Sie den Bucket Ihrer Umgebung. Durch das Löschen der Cloud Composer-Umgebung wird dessen Bucket nicht gelöscht.

    1. Rufen Sie in der Google Cloud Console Speicher auf > Browser.

      Öffnen Sie "Storage" > "Browser".

    2. Wählen Sie den Bucket der Umgebung aus und klicken Sie auf Löschen. Dieser Bucket kann beispielsweise us-central1-example-environ-c1616fe8-bucket heißen.

  3. Löschen Sie den nichtflüchtigen Speicher der Redis-Warteschlange Ihrer Umgebung. Durch das Löschen der Cloud Composer-Umgebung wird der nichtflüchtige Speicher nicht gelöscht.

    1. Gehen Sie in der Google Cloud Console zu Compute Engine > Laufwerke:

      Zur Seite „Laufwerke“

    2. Wählen Sie den nichtflüchtigen Speicher der Redis-Warteschlange der Umgebung aus und klicken Sie auf Löschen.

      Dieses Laufwerk kann z. B. benannt werden, gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee Laufwerke für Cloud Composer 1 haben immer den Standard persistent disk und die Größe 2 GB.

Nächste Schritte