Apache Airflow-DAG in Cloud Composer 3 ausführen

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

In dieser Kurzanleitung erfahren Sie, wie Sie eine Cloud Composer-Umgebung erstellen und einen Apache Airflow-DAG in Cloud Composer 3 ausführen.

Hinweis

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Composer API.

    Enable the API

  7. Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Ausführen der Kurzanleitung benötigen:

    Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

    Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Dienstkonto einer Umgebung erstellen

Wenn Sie eine Umgebung erstellen, geben Sie ein Dienstkonto an. Dieses Dienstkonto wird als Dienstkonto der Umgebung bezeichnet. In Ihrer Umgebung wird dieses Dienstkonto für die meisten Vorgänge verwendet.

Das Dienstkonto für Ihre Umgebung ist kein Nutzerkonto. Ein Dienstkonto ist ein spezieller Kontotyp, der nicht von einer Person, sondern von einer Anwendung oder einer VM-Instanz verwendet wird.

So erstellen Sie ein Dienstkonto für Ihre Umgebung:

  1. Erstellen Sie ein neues Dienstkonto, wie in der Dokumentation zur Identitäts- und Zugriffsverwaltung beschrieben.

  2. Weisen Sie ihm eine Rolle zu, wie in der Identity and Access Management-Dokumentation beschrieben. Die erforderliche Rolle ist Composer-Worker (composer.worker).

Umgebung erstellen

  1. Rufen Sie in der Google Cloud Console die Seite Umgebung erstellen auf.

    Zur Seite „Umgebung erstellen“

  2. Geben Sie im Feld Name example-environment ein.

  3. Wählen Sie in der Drop-down-Liste Standort eine Region für die Cloud Composer-Umgebung aus. In diesem Leitfaden wird die Region us-central1 verwendet.

  4. Übernehmen Sie für die anderen Optionen der Umgebungskonfiguration die angegebenen Standardeinstellungen.

  5. Klicken Sie auf Erstellen und warten Sie, bis die Umgebung erstellt wurde.

  6. Danach wird neben dem Umgebungsnamen ein grünes Häkchen angezeigt.

DAG-Datei erstellen

Ein Airflow-DAG ist eine Sammlung strukturierter Aufgaben, die Sie planen und ausführen möchten. DAGs werden in Standard-Python-Dateien definiert.

In dieser Anleitung wird ein Beispiel-Airflow-DAG verwendet, das in der Datei quickstart.py definiert ist. Der Python-Code in dieser Datei führt Folgendes aus:

  1. Der DAG composer_sample_dag wird erstellt. Dieser DAG wird jeden Tag ausgeführt.
  2. Die Aufgabe print_dag_run_conf wird ausgeführt. Sie gibt mithilfe des bash-Operators die Konfiguration der DAG-Ausführung aus.

Speichern Sie eine Kopie der Datei quickstart.py auf Ihrem lokalen Computer:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

DAG-Datei in den Bucket Ihrer Umgebung hochladen

Jede Cloud Composer-Umgebung ist mit einem Cloud Storage-Bucket verknüpft. Airflow in Cloud Composer plant nur DAGs, die sich in diesem Bucket im Ordner /dags befinden.

Zum Planen Ihres DAG laden Sie quickstart.py von Ihrem lokalen Computer in den Ordner /dags Ihrer Umgebung hoch:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung, example-environment. Die Seite Umgebungsdetails wird geöffnet.

  3. Klicken Sie auf DAGs-Ordner öffnen. Die Seite Bucket-Details wird geöffnet.

  4. Klicken Sie auf Dateien hochladen und wählen Sie dann Ihre Kopie von quickstart.py aus.

  5. Klicken Sie zum Hochladen der Datei auf Öffnen.

DAG aufrufen

Nachdem Sie die DAG-Datei hochgeladen haben, geschieht in Airflow Folgendes:

  1. Die hochgeladene DAG-Datei wird geparst. Es kann einige Minuten dauern, bis der DAG für Airflow verfügbar ist.
  2. Der DAG wird der Liste der verfügbaren DAGs hinzugefügt.
  3. Führt den DAG gemäß dem Zeitplan aus, den Sie in der DAG-Datei angegeben haben.

Prüfen Sie, ob Ihr DAG ohne Fehler verarbeitet wird und in Airflow verfügbar ist. Rufen Sie ihn dazu in der DAG-Benutzeroberfläche auf. Die DAG-Benutzeroberfläche ist die Cloud Composer-Oberfläche, über die Sie DAG-Informationen in der Google Cloud Console aufrufen können. Cloud Composer bietet auch Zugriff auf die Airflow-Benutzeroberfläche, eine native Airflow-Weboberfläche.

  1. Warten Sie etwa fünf Minuten, damit Airflow die zuvor hochgeladene DAG-Datei verarbeiten und die erste DAG-Ausführung abschließen kann (wird später erläutert).

  2. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  3. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung, example-environment. Die Seite Umgebungsdetails wird geöffnet.

  4. Rufen Sie den Tab DAGs auf.

  5. Prüfen Sie, ob der DAG composer_quickstart in der Liste der DAGs enthalten ist.

    In der Liste der DAGs wird der DAG „composer_quickstart“ mit zusätzlichen Informationen wie Status und Zeitplan angezeigt.
    Abbildung 1: In der Liste der DAGs wird der DAG „composer_quickstart“ angezeigt (zum Vergrößern anklicken).

Details zur DAG-Ausführung ansehen

Eine einzelne Ausführung eines DAG wird als DAG-Ausführung bezeichnet. Airflow führt sofort eine DAG-Ausführung für den Beispiel-DAG aus, da das Startdatum in der DAG-Datei auf „gestern“ festgelegt ist. So gleicht Airflow den Zeitplan des angegebenen DAGs wieder an.

Der Beispiel-DAG enthält eine Aufgabe, print_dag_run_conf, die den Befehl echo in der Console ausführt. Mit diesem Befehl werden Metainformationen zum DAG ausgegeben (numerische Kennung der DAG-Ausführung).

  1. Klicken Sie auf dem Tab DAGs auf composer_quickstart. Der Tab Ausführungen für den DAG wird geöffnet.

  2. Klicken Sie in der Liste der DAG-Ausführungen auf den ersten Eintrag.

    Die Liste der DAG-Ausführungen enthält die letzte DAG-Ausführung (Datum der Ausführung und Status).
    Abbildung 2. Liste der DAG-Ausführungen für den DAG „composer_quickstart“ (zum Vergrößern klicken)
  3. Die Details zur DAG-Ausführung werden angezeigt, einschließlich Informationen zu den einzelnen Aufgaben der Beispiel-DAG.

    Die Liste der Aufgaben mit einem Eintrag „print_dag_run_conf“, Start- und Endzeit sowie Dauer
    Abbildung 3. Liste der Aufgaben, die bei der DAG-Ausführung ausgeführt wurden (zum Vergrößern klicken)
  4. Im Abschnitt Logs für DAG-Ausführung sind Logs für alle Aufgaben in der DAG-Ausführung aufgeführt. Die Ausgabe des Befehls echo finden Sie in den Protokollen.

    Logeinträge der Aufgabe, von denen einer „Output“ und der andere eine Kennung enthält
    Abbildung 4: Protokolle der Aufgabe „print_dag_run_conf“ (zum Vergrößern anklicken)

Bereinigen

Mit den folgenden Schritten vermeiden Sie, dass Ihrem Google Cloud -Konto die in dieser Anleitung verwendeten Ressourcen in Rechnung gestellt werden:

Löschen Sie die Ressourcen, die in dieser Anleitung verwendet wurden:

  1. Löschen Sie die Cloud Composer-Umgebung.

    1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

      Zur Seite Umgebungen

    2. Wählen Sie example-environment aus und klicken Sie auf Löschen.

    3. Warten Sie, bis die Umgebung gelöscht ist.

  2. Löschen Sie den Bucket Ihrer Umgebung. Durch das Löschen der Cloud Composer-Umgebung wird dessen Bucket nicht gelöscht.

    1. Rufen Sie in der Google Cloud Console die Seite Speicher > Browser auf.

      Öffnen Sie "Storage" > "Browser".

    2. Wählen Sie den Bucket der Umgebung aus und klicken Sie auf Löschen. Dieser Bucket kann beispielsweise us-central1-example-environ-c1616fe8-bucket heißen.

Nächste Schritte