Workflow mit Cloud Composer

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Hinweis

Projekt einrichten

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.
  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Make sure that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init

Dataproc-Workflow-Vorlage erstellen

Kopieren Sie die unten aufgeführten Befehle und führen Sie sie in einem lokalen Terminalfenster oder in Cloud Shell aus, um eine Workflow-Vorlage zu erstellen und zu definieren.

  1. Erstelle die Workflow-Vorlage sparkpi.
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Fügen Sie der Workflow-Vorlage sparkpi den Spark-Job hinzu. Das „compute“-Flag step-id identifiziert den SparkPi-Job.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. Führen Sie den Workflow mit einem verwalteten Cluster mit einem einzelnen Knoten aus. Dataproc erstellt den Cluster, führt darauf den Workflow aus und löscht den Cluster, wenn der Workflow abgeschlossen ist.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. Erstellen der Workflow-Vorlage bestätigen.

    Console

    Klicken Sie in der Google Cloud Console auf der Dataproc-Seite Workflows auf den Namen sparkpi, um die Seite Workflow-Vorlagendetails zu öffnen. Klicken Sie auf den Namen Ihrer Workflow-Vorlage, um die sparkpi-Vorlagenattribute zu bestätigen.

    gcloud-Befehl

    Führen Sie dazu diesen Befehl aus:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

DAG erstellen und in Cloud Storage hochladen

  1. Eine Cloud Composer-Umgebung erstellen oder eine vorhandene verwenden
  2. Umgebungsvariablen festlegen

    Airflow-UI

    1. Klicken Sie in der Symbolleiste auf Admin > Variables.
    2. Klicken Sie auf Erstellen.
    3. Geben Sie die folgenden Informationen ein:
      • Key: project_id
      • Val: PROJECT_ID – Ihre Google Cloud-Projekt-ID
    4. Klicken Sie auf Speichern.

    gcloud-Befehl

    Geben Sie die folgenden Befehle ein:

    • ENVIRONMENT ist der Name der Cloud Composer-Umgebung.
    • LOCATION ist die Region, in der sich die Cloud Composer-Umgebung befindet.
    gcloud composer environments run ENVIRONMENT \
        --location LOCATION
        
  3. Kopieren Sie den folgenden DAG-Code lokal in eine Datei namens „composer-dataproc-dag.py“, die den DataprocInstantiateWorkflowTemplateOperator verwendet.

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import DataprocInstantiateWorkflowTemplateOperator
    from airflow.utils.dates import days_ago
    
    project_id = models.Variable.get("project_id")
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
    
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Airflow 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = models.Variable.get("project_id")
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
    
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. Führen Sie ein Upload des DAG in Ihren Umgebungsordner in Cloud Storage aus. Wenn der Upload erfolgreich abgeschlossen wurde, klicken Sie auf der Seite der Cloud Composer-Umgebung auf den Link DAGs-Ordner.

Status der Aufgabe ansehen

Airflow-UI

  1. Öffnen Sie die Airflow-Weboberfläche.
  2. Klicken Sie auf der Seite "DAGs" auf den DAG-Namen, z. B. dataproc_workflow_dag.
  3. Klicken Sie auf der DAGs-Detailseite auf Grafikansicht.
  4. Prüfen Sie den Status:
    • Fehlgeschlagen: Die Aufgabe ist rot umrandet. Sie können auch den Mauszeiger über die Aufgabe halten und nach State: Failed suchen. Die Aufgabe ist rot umrandet, was angibt, dass sie fehlgeschlagen ist.
    • Erfolgreich: Die Aufgabe ist grün umrandet. Sie können auch den Mauszeiger über die Aufgabe halten und nach State: Success suchen. Die Aufgabe ist grün umrandet, was angibt, dass sie erfolgreich ausgeführt wurde.

Console

Klicken Sie auf den Tab „Workflows“, um den Workflow-Status anzusehen.

gcloud-Befehl

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

Bereinigen

Um zu vermeiden, dass Ihrem Google Cloud-Konto Gebühren in Rechnung gestellt werden, löschen Sie die in dieser Anleitung verwendeten Ressourcen.

  1. Löschen Sie die Cloud Composer-Umgebung.

  2. Löschen Sie die Workflow-Vorlage.

Weitere Informationen