Apache Airflow-DAG in Cloud Composer 2 ausführen

Cloud Composer 1 Cloud Composer 2

In dieser Kurzanleitung erfahren Sie, wie Sie eine Cloud Composer-Umgebung erstellen und einen Apache Airflow-DAG in Cloud Composer 2 ausführen.

Hinweise

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  6. Cloud Composer API aktivieren.

    Aktivieren Sie die API

  7. Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für Ihr Projekt zu gewähren, damit Sie die Berechtigungen erhalten, die Sie für diese Kurzanleitung benötigen:

    Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff verwalten.

    Möglicherweise können Sie die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Umgebung erstellen

  1. Rufen Sie in der Google Cloud Console die Seite Umgebung erstellen auf.

    Zur Seite „Umgebung erstellen“

  1. Wenn dies die erste Umgebung in Ihrem Projekt ist, wird der Abschnitt Cloud Composer-Dienstkonto die erforderlichen Berechtigungen erteilen angezeigt.

    Sie fügen dem Dienstkonto Ihrer Umgebung das Konto des Cloud Composer-Dienst-Agents als neues Hauptkonto hinzu und weisen ihm die Rolle Dienst-Agent-Erweiterung für die Cloud Composer v2 API zu.

    Bestätigen Sie, dass Sie das gewünschte Dienstkonto für Ihre Umgebung verwenden, und klicken Sie auf Erteilen.

  2. Geben Sie im Feld Name example-environment ein.

  3. Wählen Sie in der Drop-down-Liste Standort eine Region für die Cloud Composer-Umgebung aus. In diesem Leitfaden wird die Region us-central1 verwendet.

  4. Übernehmen Sie für die anderen Optionen der Umgebungskonfiguration die angegebenen Standardeinstellungen.

  5. Klicken Sie auf Erstellen und warten Sie, bis die Umgebung erstellt ist.

  6. Wenn der Vorgang abgeschlossen ist, wird neben dem Namen der Umgebung ein grünes Häkchen angezeigt.

DAG-Datei erstellen

Ein Airflow-DAG ist eine Sammlung organisierter Aufgaben, die Sie planen und ausführen möchten. DAGs werden in Standard-Python-Dateien definiert.

In dieser Anleitung wird ein Beispiel für einen Airflow-DAG verwendet, der in der Datei quickstart.py definiert ist. Der Python-Code in dieser Datei führt Folgendes aus:

  1. Erstellt den DAG composer_sample_dag. Dieser DAG wird täglich ausgeführt.
  2. Führt die Aufgabe print_dag_run_conf aus. Die Aufgabe gibt die Konfiguration der DAG-Ausführung mit dem bash-Operator aus.

Speichern Sie eine Kopie der Datei quickstart.py auf Ihrem lokalen Computer:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

DAG-Datei in den Bucket Ihrer Umgebung hochladen

Jeder Cloud Composer-Umgebung ist ein Cloud Storage-Bucket zugeordnet. Airflow in Cloud Composer plant nur DAGs, die sich in diesem Bucket im Ordner /dags befinden.

Zum Planen Ihres DAG laden Sie quickstart.py von Ihrem lokalen Computer in den Ordner /dags Ihrer Umgebung hoch:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen der Umgebung: example-environment. Die Seite Umgebungsdetails wird geöffnet.

  3. Klicken Sie auf DAGs-Ordner öffnen. Die Seite Bucket-Details wird geöffnet.

  4. Klicken Sie auf Dateien hochladen und wählen Sie dann Ihre Kopie von quickstart.py aus.

  5. Klicken Sie zum Hochladen der Datei auf Öffnen.

DAG ansehen

Nachdem Sie die DAG-Datei hochgeladen haben, führt Airflow folgende Schritte aus:

  1. Parst die von Ihnen hochgeladene DAG-Datei. Es kann einige Minuten dauern, bis der DAG für Airflow verfügbar ist.
  2. Fügt den DAG der Liste der verfügbaren DAGs hinzu.
  3. Führt den DAG gemäß dem Zeitplan aus, den Sie in der DAG-Datei angegeben haben.

Prüfen Sie in der DAG-UI, ob der DAG fehlerfrei verarbeitet wird und in Airflow verfügbar ist. Die DAG-UI ist die Cloud Composer-Schnittstelle zum Ansehen von DAG-Informationen in der Google Cloud Console. Cloud Composer bietet auch Zugriff auf die Airflow-UI, eine native Airflow-Weboberfläche.

  1. Warten Sie etwa fünf Minuten, bis Airflow Zeit hat, die zuvor hochgeladene DAG-Datei zu verarbeiten und die erste DAG-Ausführung abzuschließen (wird später erläutert).

  2. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  3. Klicken Sie in der Liste der Umgebungen auf den Namen der Umgebung: example-environment. Die Seite Umgebungsdetails wird geöffnet.

  4. Rufen Sie den Tab DAGs auf.

  5. Prüfen Sie, ob der DAG composer_quickstart in der Liste der DAGs vorhanden ist.

    Die Liste der DAGs zeigt den DAG composer_Schnellstart mit zusätzlichen Informationen wie Status und Zeitplan an.
    Abbildung 1. In der Liste der DAGs wird der DAG composer_Schnellstart angezeigt (zum Vergrößern klicken).

Details zur DAG-Ausführung ansehen

Die einzelne Ausführung eines DAG wird als DAG-Ausführung bezeichnet. Airflow führt sofort eine DAG-Ausführung für den Beispiel-DAG aus, da das Startdatum in der DAG-Datei auf das gestrige Datum festgelegt ist. Auf diese Weise schließt Airflow den Zeitplan des angegebenen DAG an.

Der Beispiel-DAG enthält die Aufgabe print_dag_run_conf, die den Befehl echo in der Console ausführt. Mit diesem Befehl werden Metainformationen zum DAG ausgegeben (numerische Kennung der DAG-Ausführung).

  1. Klicken Sie auf dem Tab DAGs auf composer_quickstart. Der Tab Ausführungen für den DAG wird geöffnet.

  2. Klicken Sie in der Liste der DAG-Ausführungen auf den ersten Eintrag.

    In der Liste der DAG-Ausführungen werden die letzten DAG-Ausführungen (Ausführungsdatum und -status) angezeigt.
    Abbildung 2. Liste der DAG-Ausführungen für den DAG composer_Schnellstart (zum Vergrößern anklicken)
  3. Es werden Details der DAG-Ausführung mit den Informationen zu den einzelnen Aufgaben des Beispiel-DAG angezeigt.

    Die Liste der Aufgaben mit dem Eintrag „print_dag_run_conf“, der Startzeit, der Endzeit und der Dauer
    Abbildung 3: Die Liste der Aufgaben, die bei der DAG-Ausführung ausgeführt wurden (zum Vergrößern klicken)
  4. Im Abschnitt Logs für DAG-Ausführung werden Logs für alle Aufgaben in der DAG-Ausführung aufgeführt. Die Ausgabe des Befehls echo finden Sie in den Logs.

    Logeinträge der Aufgabe, einer davon ist „Ausgabe“ und der andere enthält eine Kennung
    Abbildung 4. Logs der Aufgabe „print_dag_run_conf“ (zum Vergrößern klicken)

Bereinigen

Mit den folgenden Schritten vermeiden Sie, dass Ihrem Google Cloud-Konto die auf dieser Seite verwendeten Ressourcen in Rechnung gestellt werden.

Löschen Sie die in dieser Anleitung verwendeten Ressourcen:

  1. Löschen Sie die Cloud Composer-Umgebung.

    1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

      Zur Seite Umgebungen“

    2. Wählen Sie example-environment aus und klicken Sie auf Löschen.

    3. Warten Sie, bis die Umgebung gelöscht ist.

  2. Löschen Sie den Bucket Ihrer Umgebung. Durch das Löschen der Cloud Composer-Umgebung wird dessen Bucket nicht gelöscht.

    1. Rufen Sie in der Google Cloud Console die Seite Speicher > Browser auf.

      Öffnen Sie "Storage" > "Browser".

    2. Wählen Sie den Bucket der Umgebung aus und klicken Sie auf Löschen. Dieser Bucket kann beispielsweise us-central1-example-environ-c1616fe8-bucket heißen.

  3. Löschen Sie den nichtflüchtigen Speicher der Redis-Warteschlange Ihrer Umgebung. Durch das Löschen der Cloud Composer-Umgebung wird der nichtflüchtige Speicher nicht gelöscht.

    1. Rufen Sie in der Google Cloud Console Compute Engine > Laufwerke auf.

      Zur Seite „Laufwerke“

    2. Wählen Sie den nichtflüchtigen Speicher der Redis-Warteschlange der Umgebung aus und klicken Sie auf Löschen.

      Dieses Laufwerk kann beispielsweise den Namen pvc-02bc4842-2312-4347-8519-d87bdcd31115 haben. Laufwerke für Cloud Composer 2 haben immer den Typ Balanced persistent disk und eine Größe von 2 GB.

Nächste Schritte