Flexible Vorlagen verwenden

In dieser Anleitung erfahren Sie, wie Sie einen flexiblen Dataflow-Vorlagenjob mit einem benutzerdefinierten Docker-Image mit dem gcloud-Befehlszeilentool erstellen und ausführen. Diese Anleitung behandelt ein Streaming-Pipeline-Beispiel, das JSON-codierte Nachrichten aus Pub/Sub liest, Nachrichtendaten mit Beam SQL umwandelt und die Ergebnisse in eine BigQuery-Tabelle schreibt.

Weitere Informationen zu Flex-Vorlagen finden Sie unter Dataflow-Vorlagen.

Ziele

  • Docker-Container-Image erstellen
  • Dataflow-Flex-Vorlage erstellen und ausführen

Kosten

In dieser Anleitung werden kostenpflichtige Komponenten von Google Cloud verwendet, darunter:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • App Engine
  • Container Registry
  • Cloud Build
  • BigQuery

Der Preisrechner kann eine Kostenschätzung anhand Ihrer voraussichtlichen Nutzung generieren.

Hinweis

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  3. Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für Ihr Projekt aktiviert ist.

  4. Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build APIs aktivieren.

    Aktivieren Sie die APIs

  5. Erstellen Sie ein Dienstkonto:

    1. Wechseln Sie in der Cloud Console zur Seite Dienstkonto erstellen.

      Zur Seite „Dienstkonto erstellen“
    2. Wählen Sie ein Projekt aus.
    3. Geben Sie im Feld Dienstkontoname einen Namen ein. In der Cloud Console wird das Feld Dienstkonto-ID anhand dieses Namens ausgefüllt.

      Geben Sie im Feld Dienstkontobeschreibung eine Beschreibung ein. Beispiel: Service account for quickstart.

    4. Klicken Sie auf Erstellen.
    5. Klicken Sie auf das Feld Rolle auswählen.

      Klicken Sie unter Schnellzugriff auf Einfach und dann auf Inhaber.

    6. Klicken Sie auf Weiter.
    7. Klicken Sie auf Fertig, um das Erstellen des Dienstkontos abzuschließen.

      Schließen Sie das Browserfenster nicht. Sie verwenden es in der nächsten Aufgabe.

  6. Dienstkontoschlüssel erstellen

    1. Klicken Sie in der Cloud Console auf die E-Mail-Adresse des von Ihnen erstellten Dienstkontos.
    2. Klicken Sie auf Schlüssel.
    3. Klicken Sie auf Schlüssel hinzufügen und dann auf Neuen Schlüssel erstellen.
    4. Klicken Sie auf Erstellen. Daraufhin wird eine JSON-Schlüsseldatei auf Ihren Computer heruntergeladen.
    5. Klicken Sie auf Schließen.
  7. Legen Sie für die Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS den Pfad der JSON-Datei fest, die Ihren Dienstkontoschlüssel enthält. Diese Variable gilt nur für Ihre aktuelle Shellsitzung. Wenn Sie eine neue Sitzung öffnen, müssen Sie die Variable noch einmal festlegen.

Nach Abschluss dieser Anleitung können Sie weitere Kosten durch Löschen von erstellten Ressourcen vermeiden. Weitere Informationen finden Sie im Abschnitt Bereinigen.

Beispielquelle und -senke erstellen

In diesem Abschnitt wird gezeigt, wie Sie Folgendes erstellen:

  • Streaming-Datenquelle mit Pub/Sub
  • Dataset, um Daten in BigQuery zu laden

Cloud Storage-Bucket erstellen

Verwenden Sie den Befehl gsutil mb:

export BUCKET="my-storage-bucket"
gsutil mb gs://$BUCKET

Pub/Sub-Thema und ein Abo für dieses Thema erstellen

Verwenden Sie das gcloud-Befehlszeilentool:

export TOPIC="messages"
export SUBSCRIPTION="ratings"

gcloud pubsub topics create $TOPIC
gcloud pubsub subscriptions create --topic $TOPIC $SUBSCRIPTION

Cloud Scheduler-Job erstellen

In diesem Schritt erstellen und führen wir mit dem gcloud-Befehlszeilentool einen Cloud Scheduler-Job aus, der "positive Bewertungen" und "negative Bewertungen" veröffentlicht.

  1. Erstellen Sie einen Cloud Scheduler-Job für dieses Google Cloud-Projekt.
    gcloud scheduler jobs create pubsub positive-ratings-publisher \
      --schedule="* * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
    
  2. Dies erstellt und führt einen Publisher für "positive Bewertungen" aus, der eine Nachricht pro Minute veröffentlicht.
  3. Starten Sie den Cloud Scheduler-Job.
    gcloud scheduler jobs run positive-ratings-publisher
    
  4. Erstellen und führen Sie einen anderen ähnlichen Publisher für "negative Bewertungen" aus, der alle zwei Minuten eine Nachricht veröffentlicht.
    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
    gcloud scheduler jobs run negative-ratings-publisher
    

BigQuery-Dataset erstellen

Verwenden Sie den Befehl bq mk:

export PROJECT="$(gcloud config get-value project)"
export DATASET="beam_samples"
export TABLE="streaming_beam_sql"

bq mk --dataset "$PROJECT:$DATASET"

Codebeispiel herunterladen

  1. Laden Sie das Codebeispiel herunter.

    Java

    Klonen Sie das Repository java-docs-samples und gehen Sie zum Codebeispiel für diese Anleitung.

      git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
      cd java-docs-samples/dataflow/flex-templates/streaming_beam_sql

    Python

    Klonen Sie das Repository python-docs-samples und gehen Sie zum Codebeispiel für diese Anleitung.

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      cd python-docs-samples/dataflow/flex-templates/streaming_beam
  2. Exportieren Sie TEMPLATE_IMAGE für diese Anleitung.
    export TEMPLATE_IMAGE="gcr.io/$PROJECT/samples/dataflow/streaming-beam-sql:latest"
    

Entwicklungsumgebung einrichten

Java

  1. Laden Sie das Java Development Kit (JDK) Version 11 herunter und installieren Sie es. Prüfen Sie, ob die Umgebungsvariable JAVA_HOME festgelegt ist und auf Ihre JDK-Installation verweist.
  2. Laden Sie Apache Maven herunter und installieren Sie es entsprechend der Maven-Installationsanleitung für Ihr Betriebssystem.
  3. (Optional) Führen Sie die Apache Beam-Pipeline für die Entwicklung lokal aus.
      mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSql \
        -Dexec.args="\
          --project=$PROJECT \
          --inputSubscription=$SUBSCRIPTION \
          --outputTable=$PROJECT:$DATASET.$TABLE \
          --tempLocation=gs://$BUCKET/samples/dataflow/temp"
  4. Erstellen Sie das Java-Projekt in einer Uber-JAR-Datei.
      mvn clean package
  5. (Optional) Vergleichen Sie die Größe der Uber-JAR-Datei mit der Originaldatei.
      ls -lh target/*.jar
    In dieser Uber-JAR-Datei sind alle Abhängigkeiten eingebettet. Sie können diese Datei als eigenständige Anwendung ausführen, in der andere Bibliotheken keine externen Abhängigkeiten haben.

Python

Verwenden Sie das Apache Beam SDK für Python mit pip und Python-Version 2.7, 3.5, 3.6 oder 3.7. Prüfen Sie mit folgendem Befehl, ob Python und pip funktionieren:

    python --version
    python -m pip --version

Wenn Ihnen Python nicht vorliegt, finden Sie die Installationsschritte für Ihr Betriebssystem auf der Seite Python installieren.

Nur Python: Container-Image erstellen

Dieser Abschnitt enthält Schritte für Python-Nutzer. Wenn Sie Java verwenden, überspringen Sie sie.

  1. (Optional) Kaniko-Cache-Nutzung standardmäßig aktivieren.
    gcloud config set builds/use_kaniko True
    
    Kaniko speichert Container-Build-Artefakte im Cache, sodass diese Option nachfolgende Builds beschleunigt.
  2. (Optional) Erstellen Sie das Dockerfile. Sie können das Dockerfile aus dieser Anleitung anpassen. Die Startdatei sieht so aus:

    Python

      FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      WORKDIR ${WORKDIR}
    
      # Due to a change in the Apache Beam base image in version 2.24, you must to install
      # libffi-dev manually as a dependency. For more information:
      #   https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4891
      RUN apt-get update && apt-get install -y libffi-dev && rm -rf /var/lib/apt/lists/*
    
      COPY requirements.txt .
      COPY streaming_beam.py .
    
      ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
      ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
    
      RUN pip install -U -r ./requirements.txt

    Dieses Dockerfile enthält die Befehle FROM, ENV und COPY. Informationen dazu finden Sie in der Dockerfile-Referenz.

    Images, die mit gcr.io/PROJECT/ beginnen, werden in der Container Registry Ihres Projekts gespeichert, wo von anderen Google Cloud-Produkten auf das Image zugegriffen werden kann.
  3. Erstellen Sie mit Cloud Build das Docker-Image mithilfe eines Dockerfile
    gcloud builds submit --tag $TEMPLATE_IMAGE .
    

Flex-Vorlage erstellen

Um eine Vorlage auszuführen, müssen Sie eine Vorlagenspezifikationsdatei in einem Cloud Storage erstellen, die alle für die Ausführung des Jobs erforderlichen Informationen enthält, z. B. die SDK-Informationen und Metadaten.

Die Datei metadata.json in diesem Beispiel enthält zusätzliche Informationen für die Vorlage, z. B. den name, die description und die Eingabefelder für parameters.

  1. Erstellen Sie eine Vorlagenspezifikationsdatei, die alle für die Ausführung des Jobs erforderlichen Informationen enthält, z. B. die SDK-Informationen und -Metadaten.
    export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/streaming-beam-sql.json"
    
  2. Erstellen Sie die flexible Vorlage.

    Java

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image-gcr-path "$TEMPLATE_IMAGE" \
          --sdk-language "JAVA" \
          --flex-template-base-image JAVA11 \
          --metadata-file "metadata.json" \
          --jar "target/streaming-beam-sql-1.0.jar" \
          --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSql"

    Python

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image "$TEMPLATE_IMAGE" \
          --sdk-language "PYTHON" \
          --metadata-file "metadata.json"

Die Vorlage ist jetzt über die Vorlagendatei am angegebenen Cloud Storage-Speicherort verfügbar.

Flex-Vorlagenpipeline ausführen

Sie können die Apache Beam-Pipeline nun in Dataflow ausführen. Verweisen Sie dazu auf die Vorlagendatei und übergeben Sie die für die Pipeline erforderlichen Vorlagen-Parameter.

  1. Führen Sie die Vorlage aus.

    Java

      export REGION="us-central1"
    
      gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters inputSubscription="$SUBSCRIPTION" \
        --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"

    Python

      export REGION="us-central1"
    
      gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters input_subscription="$SUBSCRIPTION" \
        --parameters output_table="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"
    Alternativ können Sie die Vorlage mit einer REST API-Anfrage ausführen.
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/us-central1/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "inputSubscription": "'$SUBSCRIPTION'",
            "outputTable": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'
    
  2. Nachdem Sie den Befehl zum Ausführen der Flex-Vorlage ausgeführt haben, gibt Dataflow eine Job-ID mit dem Auftragsstatus In der Warteschlange zurück. Es kann einige Minuten dauern, bis der Jobstatus Läuft erreicht und Sie auf die Jobgrafik zugreifen können.
  3. Prüfen Sie die Ergebnisse in BigQuery mit der folgenden Abfrage:
    bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'
    
    Während diese Pipeline ausgeführt wird, können Sie jede Minute neue Zeilen an die BigQuery-Tabelle anhängen.

Bereinigen

Nach Abschluss der Anleitung können Sie die von Ihnen in Google Cloud erstellten Ressourcen bereinigen, damit Ihnen diese nicht weiter in Rechnung gestellt werden. In den folgenden Abschnitten wird erläutert, wie Sie diese Ressourcen löschen oder deaktivieren.

Flex-Vorlagenressourcen bereinigen

  1. Halten Sie die Dataflow-Pipeline an.
    gcloud dataflow jobs list \
      --filter 'NAME:streaming-beam-sql AND STATE=Running' \
      --format 'value(JOB_ID)' \
      --region "$REGION" \
      | xargs gcloud dataflow jobs cancel --region "$REGION"
    
  2. Löschen Sie die Vorlagenspezifikationsdatei aus Cloud Storage.
    gsutil rm $TEMPLATE_PATH
    
  3. Löschen Sie das Container-Image der Flex-Vorlage aus Container Registry.
    gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags
    

Google Cloud-Projektressourcen bereinigen

  1. Cloud Scheduler-Jobs löschen.
    gcloud scheduler jobs delete negative-ratings-publisher
    gcloud scheduler jobs delete positive-ratings-publisher
    
  2. Löschen Sie das Pub/Sub-Abo und -Thema.
    gcloud pubsub subscriptions delete $SUBSCRIPTION
    gcloud pubsub topics delete $TOPIC
    
  3. Löschen Sie die BigQuery-Tabelle.
    bq rm -f -t $PROJECT:$DATASET.$TABLE
    
  4. Löschen Sie das BigQuery-Dataset. Hierdurch entstehen keine Kosten.

    Mit dem folgenden Befehl werden auch alle Tabellen im Dataset gelöscht. Die Tabellen und Daten können nicht wiederhergestellt werden.

    bq rm -r -f -d $PROJECT:$DATASET
    
  5. Löschen Sie den Cloud Storage-Bucket. Hierdurch entstehen keine Kosten.

    Mit dem folgenden Befehl werden auch alle Objekte im Bucket gelöscht. Diese Objekte können nicht wiederhergestellt werden.

    gsutil rm -r gs://$BUCKET
    

Beschränkungen

Die folgenden Einschränkungen gelten für flexible Vorlagenjobs:

  • Sie müssen ein von Google bereitgestelltes Basis-Image verwenden, um Ihre Container mit Docker zu packen.
  • waitUntilFinish (Java) und wait_until_finish (Python) werden nicht unterstützt.

Nächste Schritte