Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
In dieser Anleitung wird gezeigt, wie Sie mit Cloud Composer ein Apache Airflow-DAG (gerichteter azyklischer Graph) zum Ausführen eines Apache Hadoop-Wordcount-Jobs auf einem Dataproc-Server Cluster.
Lernziele
- Auf Ihre Cloud Composer-Umgebung zugreifen und die Airflow-Benutzeroberfläche verwenden
- Airflow-Umgebungsvariablen erstellen und aufrufen
- DAG mit nachfolgenden Aufgaben erstellen und ausführen:
- Erstellt ein Dataproc Cluster.
- Führt einen Apache Hadoop-Wordcount-Job im Cluster aus.
- Gibt die Wordcount-Ergebnisse aus Cloud Storage Bucket.
- Löscht den Cluster.
Kosten
In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:
- Cloud Composer
- Dataproc
- Cloud Storage
Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.
Hinweise
Achten Sie darauf, dass die folgenden APIs in Ihrem Projekt aktiviert sind:
Console
Enable the Dataproc, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.com
storage-component.googleapis.com In Ihrem Projekt Cloud Storage-Bucket erstellen einer beliebigen Speicherklasse und Region, um die Ergebnisse des Hadoop-Speichers zu speichern. Word-count-Job.
Notieren Sie sich den Pfad des Buckets, den Sie erstellt haben, z. B.
gs://example-bucket
. Für diesen Pfad definieren Sie eine Airflow-Variable, die dann später in dieser Anleitung im Beispiel-DAG verwendet wird.Erstellen Sie eine Cloud Composer-Umgebung mit Standardeinstellung. Parameter. Warten Sie, bis die Erstellung der Umgebung abgeschlossen ist. Danach wird links neben dem Umgebungsnamen ein grünes Häkchen angezeigt.
Notieren Sie sich die Region, in der Sie Ihre Umgebung erstellt haben, z. B.
us-central
. Sie definieren eine Airflow-Variable für diese Region und verwenden sie im Beispiel-DAG, um einen Dataproc-Cluster in derselben Region auszuführen.
Airflow-Variablen festlegen
Airflow-Variablen festlegen die Sie später im Beispiel-DAG verwenden können. Sie können Airflow-Variablen beispielsweise in der Airflow-Benutzeroberfläche festlegen.
Airflow-Variable | Wert |
---|---|
gcp_project
|
Die Projekt-ID des Projekts, das Sie für diese Anleitung verwenden, z. B. example-project . |
gcs_bucket
|
Der URI des Cloud Storage-Buckets, den Sie für diese Anleitung erstellt haben, z. B. gs://example-bucket . |
gce_region
|
Die Region, in der Sie Ihre Umgebung erstellt haben, z. B. us-central1 .
Dies ist die Region, in der Ihr Dataproc-Cluster
erstellt. |
Beispielworkflow ansehen
Ein Airflow-DAG ist eine Sammlung strukturierter Aufgaben, die Sie planen und ausführen möchten. DAGs werden in Standard-Python-Dateien definiert. Der Code aus
hadoop_tutorial.py
ist der Workflowcode.
Operatoren
Zur Orchestrierung der drei Aufgaben im Beispielworkflow importiert der DAG die folgenden drei Airflow-Operatoren:
DataprocClusterCreateOperator
: Erstellt einen Dataproc-Cluster.DataProcHadoopOperator
: Sendet einen Hadoop-Wordcount-Job und schreibt Ergebnisse in einen Cloud Storage-Bucket.DataprocClusterDeleteOperator
: Löscht den Cluster, um fortlaufende Compute Engine-Gebühren zu vermeiden.
Abhängigkeiten
Sie strukturieren die Aufgaben, die Sie ausführen möchten, so, dass die Struktur deren Beziehungen und Abhängigkeiten wiedergibt. Die Aufgaben in diesem DAG werden nacheinander ausgeführt.
Planung
Der Name des DAG lautet composer_hadoop_tutorial
. Er wird einmal pro Tag ausgeführt. Weil die start_date
, die an default_dag_args
übergeben wird,
auf yesterday
gesetzt ist, plant Cloud Composer den Workflow.
sofort starten, nachdem der DAG in den Bucket der Umgebung hochgeladen wurde.
DAG in den Bucket der Umgebung hochladen
Cloud Composer speichert DAGs im Ordner /dags
des Buckets Ihrer Umgebung.
So laden Sie den DAG hoch:
Speichern Sie
hadoop_tutorial.py
auf Ihrem lokalen Computer.Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
In der Liste der Umgebungen in der Spalte DAGs-Ordner für Ihr auf den Link DAGs.
Klicken Sie auf Dateien hochladen.
Wählen Sie
hadoop_tutorial.py
auf Ihrem lokalen Computer aus und klicken Sie auf Öffnen.
Cloud Composer fügt den DAG zu Airflow hinzu und plant ihn automatisch. DAG-Änderungen werden innerhalb von 3 bis 5 Minuten wirksam.
DAG-Ausführungen analysieren
Aufgabenstatus ansehen
Wenn Sie Ihre DAG-Datei in den Ordner dags/
in Cloud Storage hochladen, parst Cloud Composer die Datei. Wenn der Vorgang erfolgreich abgeschlossen wurde, wird der Name des Workflows in der DAG-Liste angezeigt und der Workflow in die Warteschlange gestellt, um sofort ausgeführt zu werden.
Rufen Sie zur Anzeige des Aufgabenstatus die Airflow-Weboberfläche auf und klicken Sie in der Symbolleiste auf DAGs.
Klicken Sie zum Öffnen der DAG-Detailseite auf
composer_hadoop_tutorial
. Dieses enthält eine grafische Darstellung der Workflowaufgaben und Abhängigkeiten.Klicken Sie zur Anzeige des Status jeder Aufgabe auf Graph View und bewegen Sie den Mauszeiger auf die Grafik für jede Aufgabe.
Workflow wieder in die Warteschlange stellen
So führen Sie den Workflow über die Grafikansicht noch einmal aus:
- Klicken Sie in der Grafikansicht der Airflow-Benutzeroberfläche auf die Grafik
create_dataproc_cluster
. - Klicken Sie zum Zurücksetzen der drei Aufgaben auf Clear und anschließend auf OK.
- Klicken Sie in der Grafikansicht noch einmal auf
create_dataproc_cluster
. - Zum nochmaligen Übergeben des Workflows an die Warteschlange klicken Sie auf Run.
Aufgabenergebnisse ansehen
Sie können den Status und die Ergebnisse des Workflows composer_hadoop_tutorial
auch auf den folgenden Google Cloud Console-Seiten prüfen:
Dataproc-Cluster: zum Überwachen des Erstellens und Löschens von Clustern. Beachten Sie, dass der vom Workflow erstellte Cluster sitzungsspezifisch ist. Er ist nur für die Dauer des Workflows vorhanden und wird mit der letzten Workflowaufgabe gelöscht.
Dataproc-Jobs: zum Ansehen oder Überwachen des Apache Hadoop-Wordcount-Jobs. Klicken Sie auf die Job-ID, um die Ausgabe des Job-Logs aufzurufen.
Cloud Storage-Browser: zum Anzeigen der Ergebnisse der Wortzählung in Ordner
wordcount
im von Ihnen erstellten Cloud Storage-Bucket für diese Anleitung.
Bereinigen
Löschen Sie die Ressourcen, die in dieser Anleitung verwendet wurden:
Löschen Sie die Cloud Composer-Umgebung, einschließlich des manuellen Löschens des Buckets der Umgebung.
Löschen Sie den Cloud Storage-Bucket, der speichert die Ergebnisse des Hadoop-Wordcount-Jobs.