Diese Seite wurde von der Cloud Translation API übersetzt.
Switch to English

Workflow mit Cloud Composer

In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Hinweis

Projekt einrichten

  1. Melden Sie sich bei Ihrem Google-Konto an.

    Wenn Sie noch kein Konto haben, melden Sie sich hier für ein neues Konto an.

  2. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  3. Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für Ihr Projekt aktiviert ist.

  4. Dataproc, Compute Engine, and Cloud Composer APIs aktivieren.

    Aktivieren Sie die APIs

  5. Installieren und initialisieren Sie das Cloud SDK.

Dataproc-Workflow-Vorlage erstellen

Kopieren Sie die unten aufgeführten Befehle und führen Sie sie in einem lokalen Terminalfenster oder in Cloud Shell aus, um eine Workflow-Vorlage zu erstellen und zu definieren.

  1. Erstellen Sie die sparkpi-Workflow-Vorlage.
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Fügen Sie der Workflow-Vorlage sparkpi den Spark-Job hinzu. Das „compute“-Flag step-id identifiziert den SparkPi-Job.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. Führen Sie den Workflow mit einem verwalteten Cluster mit einem einzelnen Knoten aus. Dataproc erstellt den Cluster, führt darauf den Workflow aus und löscht den Cluster, wenn der Workflow abgeschlossen ist.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. Bestätigen Sie das Erstellen der Workflow-Vorlage.

    Console

    Klicken Sie in der Cloud Console auf der Dataproc-Seite Workflows auf den Namen sparkpi, um die Seite Workflow-Vorlagendetails zu öffnen. Klicken Sie auf den Namen Ihrer Workflow-Vorlage, um die sparkpi-Vorlagenattribute zu bestätigen.

    gcloud-Befehl

    Führen Sie diesen Befehl aus:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

DAG in Cloud Storage erstellen und hochladen

  1. Eine Cloud Composer-Umgebung erstellen oder eine vorhandene verwenden
  2. Umgebungsvariablen festlegen

    Airflow-UI

    1. Klicken Sie in der Symbolleiste auf Admin > Variables.
    2. Klicken Sie auf Erstellen.
    3. Geben Sie die folgenden Informationen ein:
      • Key: project_id
      • Val: PROJECT_ID – Ihre Google Cloud-Projekt-ID
    4. Klicken Sie auf Speichern.

    gcloud-Befehl

    Geben Sie die folgenden Befehle ein:

    • ENVIRONMENT ist der Name der Cloud Composer-Umgebung.
    • LOCATION ist die Region, in der sich die Cloud Composer-Umgebung befindet.
    gcloud composer environments run ENVIRONMENT \
        --location LOCATION
        
  3. Kopieren Sie den folgenden DAG mit lokalem Code in eine Datei namens "composer-dataproc-dag.py", die den DataprocInstantiateWorkflowTemplateOperator verwendet.

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/concepts.html#variables
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = models.Variable.get("project_id")
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
    
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    
  4. Laden Sie Ihren DAG in Ihren Umgebungsordner in Cloud Storage. Wenn der Upload erfolgreich abgeschlossen wurde, klicken Sie auf der Seite "Cloud Composer-Umgebung" auf den Link DAGs-Ordner.

Status der Aufgabe ansehen

Airflow-UI

  1. Öffnen Sie die Airflow-Weboberfläche.
  2. Klicken Sie auf der Seite "DAGs" auf den DAG-Namen, z. B. dataproc_workflow_dag.
  3. Klicken Sie auf der DAGs-Detailseite auf Grafikansicht.
  4. Status prüfen:
    • Fehlgeschlagen: Die Aufgabe ist rot umrandet. Sie können auch den Mauszeiger über die Aufgabe halten und nach State: Failed suchen. Die Aufgabe ist rot umrandet, was angibt, dass sie fehlgeschlagen ist.
    • Erfolgreich: Die Aufgabe ist grün umrandet. Sie können auch den Mauszeiger über die Aufgabe halten und nach State: Success suchen. Die Aufgabe ist grün umrandet, was angibt, dass sie erfolgreich ausgeführt wurde.

Console

Klicken Sie auf den Tab „Workflows“, um den Workflow-Status anzusehen.

gcloud-Befehl

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

Bereinigen

Um zu vermeiden, dass Ihrem Google Cloud-Konto Gebühren in Rechnung gestellt werden, löschen Sie die in dieser Anleitung verwendeten Ressourcen.

  1. Löschen Sie die Cloud Composer-Umgebung.

  2. Löschen Sie die Workflow-Vorlage.

Nächste Schritte