Apache Airflow-DAG in Cloud Composer 1 ausführen

Cloud Composer 1 Cloud Composer 2

Auf dieser Seite erfahren Sie, wie Sie eine Cloud Composer-Umgebung erstellen und einen Apache Airflow-DAG in Cloud Composer ausführen.

Wenn Sie Airflow noch nicht kennen, finden Sie in dieser Anleitung weitere Informationen zu Airflow-Konzepten, -Objekten und ihrer Verwendung.

Hinweis

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  3. Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist.

  4. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  5. Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist.

  6. Cloud Composer API aktivieren.

    Aktivieren Sie die API

Umgebung erstellen

Console

  1. Rufen Sie in der Google Cloud Console die Seite Umgebung erstellen auf.

    Zur Seite „Umgebung erstellen“

  2. Geben Sie im Feld Name example-environment ein.

  3. Wählen Sie in der Drop-down-Liste Standort eine Region für die Cloud Composer-Umgebung aus. Weitere Informationen zur Auswahl einer Region finden Sie unter Cloudstandorte.

  4. Übernehmen Sie für die anderen Optionen der Umgebungskonfiguration die angegebenen Standardeinstellungen.

  5. Klicken Sie zum Anlegen der Umgebung auf Erstellen.

  6. Warten Sie, bis die Umgebung erstellt ist. Danach wird neben dem Umgebungsnamen ein grünes Häkchen angezeigt.

gcloud

gcloud composer environments create ENVIRONMENT_NAME \
    --location LOCATION \
    --image-version IMAGE_VERSION

Ersetzen Sie:

  • ENVIRONMENT_NAME durch den Namen der Umgebung. In dieser Kurzanleitung wird example-environment verwendet.
  • LOCATION durch eine Region für die Cloud Composer-Umgebung. Weitere Informationen zur Auswahl einer Region finden Sie unter Cloudstandorte.
  • IMAGE_VERSION durch den Namen des Cloud Composer-Images. In dieser Anleitung wird composer-1.19.2-airflow-1.10.15 verwendet, um eine Umgebung mit dem neuesten Cloud Composer-Image zu erstellen.

Beispiel:

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-1.19.2-airflow-1.10.15

Terraform

Zur Konfiguration dieser Umgebung mit Terraform fügen Sie Ihrer Terraform-Konfiguration folgenden Ressourcenblock hinzu und führen terraform apply aus.

Weitere Informationen zur Erstellung einer Cloud Composer-Umgebung mit Terraform finden Sie in der Terraform-Dokumentation.

resource "google_composer_environment" "example" {
  name = "ENVIRONMENT_NAME"
  region = "LOCATION"

  config {
    software_config {
      image_version = "IMAGE_VERSION"
    }
  }
}
  • ENVIRONMENT_NAME durch den Namen der Umgebung. In dieser Kurzanleitung wird example-environment verwendet.
  • LOCATION durch eine Region für die Cloud Composer-Umgebung. Weitere Informationen zur Auswahl einer Region finden Sie unter Cloudstandorte.
  • IMAGE_VERSION durch den Namen des Cloud Composer-Images. In dieser Anleitung wird composer-1.19.2-airflow-1.10.15 verwendet, um eine Umgebung mit dem neuesten Cloud Composer-Image zu erstellen.

Beispiel:

resource "google_composer_environment" "example" {
  name = "example-environment"
  region = "us-central1"

  config {
    software_config {
      image_version = "composer-1.19.2-airflow-1.10.15"
    }
  }

}

Umgebungsdetails ansehen

Wenn die Umgebung erstellt ist, können Sie Informationen zu Ihrer Umgebung aufrufen, z. B. die Cloud Composer-Version, die URL für die Airflow-Weboberfläche und den DAGs-Ordner in Cloud Storage.

So rufen Sie die Umgebungsinformationen auf:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie zum Aufrufen der Seite Umgebungsdetails auf den Namen Ihrer Umgebung: example-environment.

DAG erstellen

Ein Airflow-DAG ist eine Sammlung strukturierter Aufgaben, die Sie planen und ausführen möchten. DAGs werden in Standard-Python-Dateien definiert.

Der Python-Code in quickstart.py führt Folgendes aus:

  1. Erstellt einen DAG, composer_sample_dag. Der DAG wird einmal täglich ausgeführt.
  2. Führt die Aufgabe print_dag_run_conf aus. Die Aufgabe gibt die DAG-Ausführung mit dem Bash-Operator aus.

Zum Erstellen eines DAG legen Sie eine Kopie der Datei quickstart.py auf Ihrem lokalen Computer an.

Airflow 1

import datetime

import airflow
from airflow.operators import bash_operator

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with airflow.DAG(
        'composer_sample_dag',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash_operator.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

Airflow 2

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with models.DAG(
        'composer_quickstart',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

DAG in Cloud Storage hochladen

Cloud Composer plant nur die DAGs, die sich im /dags-Ordner des Cloud Storage-Buckets der Umgebung befinden.

Zum Planen Ihres DAG laden Sie quickstart.py von Ihrem lokalen Computer in den Ordner /dags Ihrer Umgebung hoch.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Zum Öffnen des Ordners /dags folgen Sie dem Link DAGs-Ordner für example-environment.

  3. Klicken Sie auf der Bucket-Detailseite auf Dateien hochladen und wählen Sie dann Ihre lokale Kopie von quickstart.py aus.

  4. Klicken Sie zum Hochladen der Datei auf Öffnen.

    Nachdem Sie Ihren DAG hochgeladen haben, fügt Cloud Composer den DAG zu Airflow hinzu und plant sofort eine DAG-Ausführung. Es kann einige Minuten dauern, bis der DAG in der Airflow-Weboberfläche angezeigt wird.

gcloud

Führen Sie den folgenden Befehl aus, um quickstart.py mit gcloud hochzuladen:

gcloud composer environments storage dags import \
--environment example-environment  --location us-central1 \
--source quickstart.py

DAG in der Airflow-UI ansehen

Jede Cloud Composer-Umgebung hat einen Webserver, auf dem die Airflow-Weboberfläche ausgeführt wird. Sie können DAGs über die Airflow-Weboberfläche verwalten.

So rufen Sie den DAG in der Airflow-Weboberfläche auf:

Airflow 1

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Zum Öffnen der Airflow-Weboberfläche klicken Sie auf den Link Airflow für example-environment. Die Airflow-UI wird in einem neuen Browserfenster geöffnet.

  3. Rufen Sie in der Airflow-Symbolleiste die Seite DAGs auf.

  4. Klicken Sie zum Öffnen der DAG-Detailseite auf composer_sample_dag.

    DAGs-Seite in der Airflow-UI
    Abbildung 1. DAGs-Seite in der Airflow-UI (zum Vergrößern klicken)

    Die Seite für den DAG enthält die Baumansicht, eine grafische Darstellung der Aufgaben und Abhängigkeiten des Workflows.

    Baumansicht für den DAG für Composer_sample_dags
    Abbildung 2. Baumansicht für den DAG für creator_sample_dags

Airflow 2

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Zum Öffnen der Airflow-Weboberfläche klicken Sie auf den Link Airflow für example-environment. Die Airflow-UI wird in einem neuen Browserfenster geöffnet.

  3. Rufen Sie in der Airflow-Symbolleiste die Seite DAGs auf.

  4. Klicken Sie zum Öffnen der DAG-Detailseite auf composer_sample_dag.

    DAGs-Seite in der Airflow-UI
    Abbildung 1. DAGs-Seite in der Airflow-UI (zum Vergrößern klicken)

    Die Seite für den DAG enthält die Baumansicht, eine grafische Darstellung der Aufgaben und Abhängigkeiten des Workflows.

    Baumansicht für den DAG für Composer_sample_dags
    Abbildung 2. Baumansicht für den DAG für creator_sample_dags

Details zur Aufgabeninstanz in den Airflow-Logs aufrufen

Der von Ihnen geplante DAG enthält die Aufgabe print_dag_run_conf. Diese Aufgabe gibt die Konfiguration für die DAG-Ausführung aus, die Sie in den Airflow-Logs für die Aufgabeninstanz aufrufen können.

So rufen Sie die Details der Aufgabeninstanz auf:

Airflow 1

  1. Klicken Sie in der Baumansicht des DAG in der Airflow-Weboberfläche auf Grafikansicht.

    Wenn Sie den Mauszeiger über die Aufgabe print_dag_run_conf halten, wird deren Status angezeigt.

    Baumansicht für den DAG für Composer_sample_dags
    Abbildung 3: Status der Aufgabe „print_dag_run_conf“
  2. Klicken Sie auf die Aufgabe print_dag_run_conf.

    Im Kontextmenü der Aufgabeninstanz können Sie Metadaten abrufen und einige Aktionen ausführen.

    Kontextmenü der Aufgabeninstanz für die Aufgabe „ Composer_sample_dags“
    Abbildung 4. Kontextmenü der Aufgabeninstanz für die Aufgabe „creator_sample_dags“
  3. Klicken Sie im Kontextmenü der Aufgabeninstanz auf Log anzeigen.

  4. Suchen Sie im Log nach Running: ['bash', um die Ausgabe des bash-Operators zu sehen.

    Logausgabe des Bash-Operators
    Abbildung 5. Logausgabe des Bash-Operators

Airflow 2

  1. Klicken Sie in der Baumansicht des DAG in der Airflow-Weboberfläche auf Grafikansicht.

    Wenn Sie den Mauszeiger über die Aufgabe print_dag_run_conf halten, wird deren Status angezeigt.

    Baumansicht für den DAG für Composer_sample_dags
    Abbildung 3: Status der Aufgabe „print_dag_run_conf“
  2. Klicken Sie auf die Aufgabe print_dag_run_conf.

    Im Kontextmenü der Aufgabeninstanz können Sie Metadaten abrufen und einige Aktionen ausführen.

    Kontextmenü der Aufgabeninstanz für die Aufgabe „ Composer_sample_dags“
    Abbildung 4. Kontextmenü der Aufgabeninstanz für die Aufgabe „creator_sample_dags“
  3. Klicken Sie im Kontextmenü der Aufgabeninstanz auf Log anzeigen.

  4. Suchen Sie im Log nach Running command: ['bash' für die Ausgabe des bash-Operators.

    [2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command:
    ['bash', '-c', 'echo 735']
    [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output:
    [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735
    [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with
    return code 0
    

Bereinigen

So vermeiden Sie, dass Ihrem Google Cloud-Konto die auf dieser Seite verwendeten Ressourcen in Rechnung gestellt werden.

Löschen Sie die in dieser Anleitung verwendeten Ressourcen:

  1. Löschen Sie die Cloud Composer-Umgebung.

    1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

      Zur Seite Umgebungen“

    2. Wählen Sie example-environment aus und klicken Sie auf Löschen.

    3. Warten Sie, bis die Umgebung gelöscht ist.

  2. Löschen Sie den Bucket Ihrer Umgebung. Durch das Löschen der Cloud Composer-Umgebung wird dessen Bucket nicht gelöscht.

    1. Rufen Sie in der Google Cloud Console die Seite Speicher auf. Browser.

      Öffnen Sie "Storage" > "Browser".

    2. Wählen Sie den Bucket der Umgebung aus und klicken Sie auf Löschen. Dieser Bucket kann beispielsweise us-central1-example-environ-c1616fe8-bucket heißen.

  3. Löschen Sie den nichtflüchtigen Speicher der Redis-Warteschlange Ihrer Umgebung. Durch das Löschen der Cloud Composer-Umgebung wird der nichtflüchtige Speicher nicht gelöscht.

    1. Wechseln Sie in der Google Cloud Console zu Compute Engine und Disks.

      Zur Seite „Laufwerke“

    2. Wählen Sie den nichtflüchtigen Speicher der Redis-Warteschlange der Umgebung aus und klicken Sie auf Löschen.

      Dieses Laufwerk kann beispielsweise den Namen gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee haben. Laufwerke für Cloud Composer 1 haben immer den Typ Standard persistent disk und eine Größe von 2 GB.

Weitere Informationen