Hadoop-Wordcount-Job in einem Dataproc-Cluster ausführen

Cloud Composer 1 Cloud Composer 2

In dieser Anleitung wird gezeigt, wie Sie mit Cloud Composer einen Apache Airflow-DAG (Directed Acyclic Graph) erstellen, der einen Apache Hadoop-Wordcount-Job in einem Dataproc-Cluster ausführt.

Lernziele

  1. Rufen Sie Ihre Cloud Composer-Umgebung auf und verwenden Sie die Airflow-UI.
  2. Airflow-Umgebungsvariablen erstellen und aufrufen
  3. Erstellen Sie einen DAG mit den folgenden Aufgaben und führen Sie ihn aus:
    1. Ein Dataproc-Cluster wird erstellt.
    2. Führt einen Apache Hadoop-Job zum Zählen der Wörter im Cluster aus.
    3. Gibt die Ergebnisse der Wortanzahl in einem Cloud Storage-Bucket aus.
    4. Löscht den Cluster.

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

  • Cloud Composer
  • Dataproc
  • Cloud Storage

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Hinweise

  • Achten Sie darauf, dass die folgenden APIs in Ihrem Projekt aktiviert sind:

    Console

    Dataproc, Cloud Storage APIs aktivieren.

    Aktivieren Sie die APIs

    gcloud

    Aktivieren Sie die Dataproc, Cloud Storage APIs:

    gcloud services enable dataproc.googleapis.com storage-component.googleapis.com

  • Erstellen Sie einen Cloud Storage-Bucket einer beliebigen Speicherklasse und Region in Ihrem Projekt, um die Ergebnisse des Hadoop-Wordcount-Jobs zu speichern.

  • Notieren Sie sich den Pfad des Buckets, den Sie erstellt haben, z. B. gs://example-bucket. Sie definieren eine Airflow-Variable für diesen Pfad und verwenden sie später in dieser Anleitung im Beispiel-DAG.

  • Erstellen Sie eine Cloud Composer-Umgebung mit Standardparametern. Warten Sie, bis die Erstellung der Umgebung abgeschlossen ist. Wenn der Vorgang abgeschlossen ist, wird links neben dem Umgebungsnamen das grüne Häkchen angezeigt.

  • Notieren Sie sich die Region, in der Sie die Umgebung erstellt haben, z. B. us-central. Sie definieren eine Airflow-Variable für diese Region und verwenden sie im Beispiel-DAG, um einen Dataproc-Cluster in derselben Region auszuführen.

Airflow-Variablen festlegen

Legen Sie die Airflow-Variablen fest, die später im Beispiel-DAG verwendet werden. Sie können Airflow-Variablen beispielsweise in der Airflow-UI festlegen.

Airflow-Variable Wert
gcp_project Die Projekt-ID des Projekts, das Sie für diese Anleitung verwenden, z. B. example-project.
gcs_bucket Den Cloud Storage-URI-Bucket, den Sie für diese Anleitung erstellt haben, z. B. gs://example-bucket.
gce_region Die Region, in der Sie die Umgebung erstellt haben, z. B. us-central1. Dies ist die Region, in der Ihr Dataproc-Cluster erstellt wird.

Beispielworkflow ansehen

Ein Airflow-DAG ist eine Sammlung organisierter Aufgaben, die Sie planen und ausführen möchten. DAGs werden in Standard-Python-Dateien definiert. Der in hadoop_tutorial.py angezeigte Code ist der Workflowcode.

Airflow 2

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

HADOOP_JOB = {
    "reference": {"project_id": "{{ var.value.gcp_project }}"},
    "placement": {"cluster_name": "composer-hadoop-tutorial-cluster-{{ ds_nodash }}"},
    "hadoop_job": {
        "main_jar_file_uri": WORDCOUNT_JAR,
        "args": wordcount_args,
    },
}

CLUSTER_CONFIG = {
    "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
    "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
    "region": "{{ var.value.gce_region }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc.DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        cluster_config=CLUSTER_CONFIG,
        region="{{ var.value.gce_region }}",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc.DataprocSubmitJobOperator(
        task_id="run_dataproc_hadoop", job=HADOOP_JOB
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        num_workers=2,
        region="{{ var.value.gce_region }}",
        master_machine_type="n1-standard-2",
        worker_machine_type="n1-standard-2",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id="run_dataproc_hadoop",
        main_jar=WORDCOUNT_JAR,
        region="{{ var.value.gce_region }}",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        arguments=wordcount_args,
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Operatoren

Zur Orchestrierung der drei Aufgaben im Beispielworkflow importiert der DAG die folgenden drei Airflow-Operatoren:

  • DataprocClusterCreateOperator: Erstellt einen Dataproc-Cluster.

  • DataProcHadoopOperator: sendet einen Hadoop-Wordcount-Job und schreibt die Ergebnisse in einen Cloud Storage-Bucket.

  • DataprocClusterDeleteOperator: Löscht den Cluster, um laufende Compute Engine-Gebühren zu vermeiden.

Abhängigkeiten

Sie organisieren Aufgaben, die Sie ausführen möchten, so, dass ihre Beziehungen und Abhängigkeiten widergespiegelt werden. Die Aufgaben in diesem DAG werden nacheinander ausgeführt.

Airflow 2

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Wird geplant

Der Name des DAG lautet composer_hadoop_tutorial. Er wird einmal täglich ausgeführt. Da die an default_dag_args übergebene start_date auf yesterday gesetzt ist, plant Cloud Composer den Workflow so, dass er sofort nach dem Hochladen des DAG in den Bucket der Umgebung startet.

Airflow 2

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Airflow 1

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

DAG in den Bucket der Umgebung hochladen

Cloud Composer speichert DAGs im Ordner /dags im Bucket Ihrer Umgebung.

So laden Sie den DAG hoch:

  1. Speichern Sie hadoop_tutorial.py auf Ihrem lokalen Computer.

  2. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  3. Klicken Sie in der Liste der Umgebungen in der Spalte DAGs-Ordner für Ihre Umgebung auf den Link DAGs.

  4. Klicken Sie auf Dateien hochladen.

  5. Wählen Sie hadoop_tutorial.py auf Ihrem lokalen Computer aus und klicken Sie auf Öffnen.

Cloud Composer fügt den DAG zu Airflow hinzu und plant ihn automatisch. DAG-Änderungen werden innerhalb von 3 bis 5 Minuten wirksam.

DAG-Ausführungen kennenlernen

Aufgabenstatus ansehen

Wenn Sie die DAG-Datei in den Ordner dags/ in Cloud Storage hochladen, parst Cloud Composer die Datei. Wenn der Workflow erfolgreich abgeschlossen wurde, wird der Name des Workflows in der DAG-Liste angezeigt und der Workflow wird zur sofortigen Ausführung in die Warteschlange gestellt.

  1. Rufen Sie zur Anzeige des Aufgabenstatus die Airflow-Weboberfläche auf und klicken Sie in der Symbolleiste auf DAGs.

  2. Klicken Sie zum Öffnen der DAG-Detailseite auf composer_hadoop_tutorial. Diese Seite enthält eine grafische Darstellung der Workflowaufgaben und -abhängigkeiten.

  3. Klicken Sie zur Anzeige des Status jeder Aufgabe auf Graph View und bewegen Sie den Mauszeiger auf die Grafik für jede Aufgabe.

Workflow wieder in die Warteschlange stellen

So führen Sie den Workflow über die Grafikansicht noch einmal aus:

  1. Klicken Sie in der Grafikansicht der Airflow-Benutzeroberfläche auf die Grafik create_dataproc_cluster.
  2. Klicken Sie zum Zurücksetzen der drei Aufgaben auf Clear und anschließend auf OK.
  3. Klicken Sie in der Grafikansicht noch einmal auf create_dataproc_cluster.
  4. Zum nochmaligen Übergeben des Workflows an die Warteschlange klicken Sie auf Run.

Aufgabenergebnisse ansehen

Sie können den Status und die Ergebnisse des Workflows composer_hadoop_tutorial auch auf den folgenden Seiten der Google Cloud Console prüfen:

  • Dataproc-Cluster: zum Überwachen der Erstellung und des Löschens von Clustern. Der vom Workflow erstellte Cluster ist sitzungsspezifisch. Er existiert nur für die Dauer des Workflows und wird mit der letzten Workflowaufgabe gelöscht.

    Zu Dataproc-Clustern

  • Dataproc-Jobs: zum Aufrufen oder Überwachen des Apache Hadoop-Wordcount-Jobs. Klicken Sie auf die Job-ID, um die Ausgabe des Job-Logs aufzurufen.

    Zu Dataproc-Jobs

  • Cloud Storage-Browser: Zeigt die Ergebnisse der Wortanzahl im Ordner wordcount des Cloud Storage-Bucket an, den Sie für diese Anleitung erstellt haben.

    Zum Cloud Storage-Browser

Bereinigen

Löschen Sie die in dieser Anleitung verwendeten Ressourcen:

  1. Löschen Sie die Cloud Composer-Umgebung, einschließlich des Buckets der Umgebung.

  2. Löschen Sie den Cloud Storage-Bucket, in dem die Ergebnisse des Hadoop-Wordcount-Jobs gespeichert sind.