Schnellstart

Cloud Composer 1 Cloud Composer 2

Auf dieser Seite erfahren Sie, wie Sie eine Cloud Composer-Umgebung erstellen und einen Apache Airflow-DAG in Cloud Composer 1 ausführen.

Hinweis

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  3. Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für Ihr Projekt aktiviert ist.

  4. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  5. Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für Ihr Projekt aktiviert ist.

  6. Aktivieren Sie die Cloud Composer API.

    Aktivieren Sie die API

Umgebung erstellen

Console

  1. Rufen Sie in der Google Cloud Console die Seite Load-Balancer erstellen auf.

    Zur Seite „Umgebung erstellen“

  2. Geben Sie im Feld Name example-environment ein.

  3. Wählen Sie in der Drop-down-Liste Standort eine Region für die Cloud Composer-Umgebung aus. Weitere Informationen zur Auswahl einer Region finden Sie unter Cloudstandorte.

  4. Übernehmen Sie für die anderen Optionen der Umgebungskonfiguration die angegebenen Standardeinstellungen.

  5. Klicken Sie zum Anlegen der Umgebung auf Erstellen.

  6. Warten Sie, bis die Umgebung erstellt ist. Danach wird neben dem Umgebungsnamen ein grünes Häkchen angezeigt.

gcloud

gcloud composer environments create ENVIRONMENT_NAME \
    --location LOCATION

Ersetzen Sie:

  • ENVIRONMENT_NAME durch den Namen der Umgebung. In dieser Kurzanleitung wird example-environment verwendet.

  • LOCATION durch eine Region für die Cloud Composer-Umgebung. Weitere Informationen zur Auswahl einer Region finden Sie unter Cloudstandorte.

Beispiel:

gcloud composer environments create example-environment \
    --location us-central1

Terraform

Zur Konfiguration dieser Umgebung mit Terraform fügen Sie Ihrer Terraform-Konfiguration folgenden Ressourcenblock hinzu und führen terraform apply aus.

Weitere Informationen zur Erstellung einer Cloud Composer-Umgebung mit Terraform finden Sie in der Terraform-Dokumentation.

resource "google_composer_environment" "example" {
  name = "ENVIRONMENT_NAME"
  region = "LOCATION"
}

Ersetzen Sie:

  • ENVIRONMENT_NAME durch den Namen der Umgebung.

  • LOCATION durch die Region für die Umgebung.

    Ein Standort ist eine Region für die Cloud Composer-Umgebung. Weitere Informationen zur Auswahl einer Region finden Sie unter Cloudstandorte.

Beispiel:

resource "google_composer_environment" "example" {
  name = "example-environment"
  region = "us-central1"
}

Umgebungsdetails ansehen

Wenn die Umgebung erstellt ist, können Sie Informationen zu Ihrer Umgebung aufrufen, z. B. die Cloud Composer-Version, die URL für die Airflow-Weboberfläche und den DAGs-Ordner in Cloud Storage.

So rufen Sie die Umgebungsinformationen auf:

  1. Öffnen Sie in der Google Cloud Console die Seite Umgebungen.

    Zur Seite Umgebungen

  2. Klicken Sie zum Aufrufen der Seite Umgebungsdetails auf den Namen Ihrer Umgebung: example-environment.

DAG erstellen

Ein Airflow-DAG ist eine Sammlung strukturierter Aufgaben, die Sie planen und ausführen möchten. DAGs werden in Standard-Python-Dateien definiert.

Der Python-Code in quickstart.py führt Folgendes aus:

  1. Der DAG composer_sample_dag wird erstellt und anschließen einmal pro Tag ausgeführt.
  2. Die Aufgabe print_dag_run_conf wird ausgeführt. Sie gibt mithilfe des bash-Operators die Konfiguration der DAG-Ausführung aus.

Zum Erstellen eines DAG legen Sie eine Kopie der Datei quickstart.py auf Ihrem lokalen Computer an.

Airflow 1

import datetime

import airflow
from airflow.operators import bash_operator

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with airflow.DAG(
        'composer_sample_dag',
        'catchup=False',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash_operator.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

Airflow 2

import datetime

import airflow
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with airflow.DAG(
        'composer_sample_dag',
        'catchup=False',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

DAG in Cloud Storage hochladen

Cloud Composer plant nur die DAGs, die sich im /dags-Ordner des Cloud Storage-Buckets der Umgebung befinden.

Zum Planen Ihres DAG laden Sie quickstart.py von Ihrem lokalen Computer in den Ordner /dags Ihrer Umgebung hoch.

Console

  1. Öffnen Sie in der Google Cloud Console die Seite Umgebungen.

    Zur Seite Umgebungen

  2. Zum Öffnen des Ordners /dags folgen Sie dem Link DAGs-Ordner für example-environment.

  3. Klicken Sie auf der Bucket-Detailseite auf Dateien hochladen und wählen Sie dann Ihre lokale Kopie von quickstart.py aus.

  4. Klicken Sie zum Hochladen der Datei auf Öffnen.

    Nachdem Sie Ihren DAG hochgeladen haben, fügt Cloud Composer den DAG zu Airflow hinzu und plant sofort eine DAG-Ausführung. Es kann einige Minuten dauern, bis der DAG in der Airflow-Weboberfläche angezeigt wird.

gcloud

Führen Sie den folgenden Befehl aus, um quickstart.py mit gcloud hochzuladen:

gcloud composer environments storage dags import \
--environment example-environment  --location us-central1 \
--source quickstart.py

Rufen Sie den DAG in der Airflow-Weboberfläche auf

Jede Cloud Composer-Umgebung hat einen Webserver, auf dem die Airflow-Weboberfläche ausgeführt wird. Sie können DAGs über die Airflow-Weboberfläche verwalten.

So rufen Sie den DAG in der Airflow-Weboberfläche auf:

Airflow 1

  1. Öffnen Sie in der Google Cloud Console die Seite Umgebungen.

    Zur Seite Umgebungen

  2. Zum Öffnen der Airflow-Weboberfläche klicken Sie auf den Link Airflow für example-environment. Die Airflow-UI wird in einem neuen Browserfenster geöffnet.

  3. Rufen Sie in der Airflow-Symbolleiste die Seite DAGs auf.

  4. Klicken Sie zum Öffnen der DAG-Detailseite auf composer_sample_dag.

    Die Seite für den DAG enthält die Baumansicht, eine grafische Darstellung der Aufgaben und Abhängigkeiten des Workflows.

Airflow 2

  1. Öffnen Sie in der Google Cloud Console die Seite Umgebungen.

    Zur Seite Umgebungen

  2. Zum Öffnen der Airflow-Weboberfläche klicken Sie auf den Link Airflow für example-environment. Die Airflow-UI wird in einem neuen Browserfenster geöffnet.

  3. Rufen Sie in der Airflow-Symbolleiste die Seite DAGs auf.

  4. Klicken Sie zum Öffnen der DAG-Detailseite auf composer_sample_dag.

    Die Seite für den DAG enthält die Baumansicht, eine grafische Darstellung der Aufgaben und Abhängigkeiten des Workflows.

Details zur Aufgabeninstanz in den Airflow-Logs aufrufen

Der von Ihnen geplante DAG enthält die Aufgabe print_dag_run_conf. Diese Aufgabe gibt die Konfiguration für die DAG-Ausführung aus, die Sie in den Airflow-Logs für die Aufgabeninstanz aufrufen können.

So rufen Sie die Details der Aufgabeninstanz auf:

Airflow 1

  1. Klicken Sie in der Baumansicht des DAG in der Airflow-Weboberfläche auf Grafikansicht.

    Wenn Sie den Mauszeiger über die Aufgabe print_dag_run_conf halten, wird deren Status angezeigt.

  2. Klicken Sie auf die Aufgabe print_dag_run_conf.

    Im Kontextmenü der Aufgabeninstanz können Sie Metadaten abrufen und einige Aktionen ausführen.

  3. Klicken Sie im Kontextmenü der Aufgabeninstanz auf Log anzeigen.

  4. Suchen Sie im Log nach Running: ['bash' für die Ausgabe des bash-Operators.

Airflow 2

  1. Klicken Sie in der Baumansicht des DAG in der Airflow-Weboberfläche auf Grafikansicht.

    Wenn Sie den Mauszeiger über die Aufgabe print_dag_run_conf halten, wird deren Status angezeigt.

  2. Klicken Sie auf die Aufgabe print_dag_run_conf.

    Im Kontextmenü der Aufgabeninstanz können Sie Metadaten abrufen und einige Aktionen ausführen.

  3. Klicken Sie im Kontextmenü der Aufgabeninstanz auf Log anzeigen.

  4. Suchen Sie im Log nach Running command: ['bash' für die Ausgabe des bash-Operators.

    [2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command:
    ['bash', '-c', 'echo 735']
    [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output:
    [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735
    [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with
    return code 0
    

Bereinigen

Mit den folgenden Schritten vermeiden Sie, dass Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen in Rechnung gestellt werden:

Wählen Sie eine der folgenden Optionen aus:

  • Am einfachsten bereinigen können Sie, indem Sie das Projekt löschen, das Sie für den Schnellstart erstellt haben.
  • Alternativ können Sie die einzelnen Ressourcen löschen.

Projekt löschen

  1. Wechseln Sie in der Cloud Console zur Seite Ressourcen verwalten.

    Zur Seite „Ressourcen verwalten“

  2. Wenn das Projekt, das Sie löschen möchten, an eine Organisation geknüpft ist, maximieren Sie in der Spalte Name die Liste Organisation.
  3. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie dann auf Löschen.
  4. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Shut down (Beenden), um das Projekt zu löschen.

Einzelne Ressourcen löschen

Anstatt das Projekt zu löschen, können Sie die in dieser Anleitung verwendeten Ressourcen löschen:

  1. Löschen Sie die Cloud Composer-Umgebung.

    1. Öffnen Sie in der Google Cloud Console die Seite Umgebungen.

      Zur Seite Umgebungen

    2. Wählen Sie example-environment aus und klicken Sie auf Löschen.

    3. Warten Sie, bis die Umgebung gelöscht ist.

  2. Löschen Sie den Bucket Ihrer Umgebung. Durch das Löschen der Cloud Composer-Umgebung wird dessen Bucket nicht gelöscht.

    1. Rufen Sie in der Google Cloud Console die Seite Storage > Browser auf.

      Öffnen Sie "Storage" > "Browser".

    2. Wählen Sie den Bucket der Umgebung aus und klicken Sie auf Löschen. Dieser Bucket kann beispielsweise us-central1-example-environ-c1616fe8-bucket heißen.

  3. Löschen Sie den nichtflüchtigen Speicher der Redis-Warteschlange Ihrer Umgebung. Durch das Löschen der Cloud Composer-Umgebung wird der nichtflüchtige Speicher nicht gelöscht.

    1. Wechseln Sie in der Google Cloud Console zu Compute Engine > Laufwerke.

      Zur Seite „Laufwerke“

    2. Wählen Sie den nichtflüchtigen Speicher der Redis-Warteschlange der Umgebung aus und klicken Sie auf Löschen. Das Laufwerk kann beispielsweise gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee heißen. Solche Laufwerke haben immer den Typ Standard persistent disk und die Größe 2 GB.

Nächste Schritte