Klassische Vorlagen ausführen

Nachdem Sie Ihre Dataflow-Vorlage erstellt und implementiert haben, führen Sie die Vorlage über die Google Cloud Console, die REST API oder die Google Cloud CLI aus. Dataflow-Vorlagenjobs können in zahlreichen Umgebungen bereitgestellt werden, unter anderem in der App Engine-Standardumgebung, in Cloud Run-Funktionen und in anderen eingeschränkten Umgebungen.

Google Cloud Console verwenden

Sie können die Cloud Console verwenden, um von Google bereitgestellte und benutzerdefinierte Dataflow-Vorlagen auszuführen.

Von Google bereitgestellte Vorlagen

Eine von Google bereitgestellte Vorlage ausführen:

  1. Rufen Sie in der Google Cloud Console die Seite "Dataflow" auf.
  2. Zur Seite "Dataflow"
  3. Klicken Sie auf JOB AUS VORLAGE ERSTELLEN.
  4. Schaltfläche „Job aus Vorlage erstellen” in der Google Cloud Console
  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die von Google bereitgestellte Vorlage aus, die Sie ausführen möchten.
  6. Formular für die WordCount-Vorlagenausführung
  7. Geben Sie einen Jobnamen in das Feld Jobname ein.
  8. Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein. Den Abschnitt Zusätzliche Parameter benötigen Sie nicht, wenn Sie eine von Google bereitgestellte Vorlage verwenden.
  9. Klicken Sie auf Job ausführen.

Benutzerdefinierte Vorlagen

Benutzerdefinierte Vorlage ausführen:

  1. Rufen Sie in der Google Cloud Console die Seite "Dataflow" auf.
  2. Zur Seite "Dataflow"
  3. Klicken Sie auf JOB AUS VORLAGE ERSTELLEN.
  4. Schaltfläche „Job aus Vorlage erstellen” in der Google Cloud Console
  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option Benutzerdefinierte Vorlage aus.
  6. Formular für die Ausführung benutzerdefinierter Vorlagen
  7. Geben Sie einen Jobnamen in das Feld Jobname ein.
  8. Geben Sie den Cloud Storage-Pfad zu Ihrer Vorlage in das Vorlagenfeld für den Cloud Storage-Pfad ein.
  9. Wenn für Ihre Vorlage Parameter erforderlich sind, klicken Sie im Abschnitt Zusätzliche Parameter auf PARAMETER HINZUFÜGEN. Geben Sie den Namen und den Wert des Parameters ein. Wiederholen Sie diesen Schritt für jeden benötigten Parameter.
  10. Klicken Sie auf Job ausführen.

Verwenden der REST API

Senden Sie zum Ausführen einer Vorlage mit einer REST API-Anfrage eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.

Weitere Informationen zu den verfügbaren Parametern finden Sie in der REST API-Referenz für projects.locations.templates.launch.

Batchjob mit benutzerdefinierter Vorlage erstellen

Durch die Beispielanfrage projects.locations.templates.launch wird aus einer Vorlage ein Batch-Job erstellt, der eine Textdatei liest und eine Ausgabetextdatei schreibt. Bei Erfolg enthält der Antworttext eine Instanz von LaunchTemplateResponse.

Ändern Sie die folgenden Werte:

  • Ersetzen Sie YOUR_PROJECT_ID durch Ihre Projekt-ID.
  • Ersetzen Sie LOCATION durch die Dataflow-Region Ihrer Wahl.
  • Ersetzen Sie JOB_NAME durch einen Jobnamen Ihrer Wahl.
  • Ersetzen Sie YOUR_BUCKET_NAME durch den Namen des Cloud Storage-Buckets.
  • Legen Sie für gcsPath den Cloud Storage-Speicherort der Vorlagendatei fest.
  • Stellen Sie das Flag parameters auf Ihre Liste der Schlüssel/Wert-Paare ein.
  • Legen Sie für tempLocation einen Speicherort fest, für den Sie die Schreibberechtigung haben. Dieser Wert ist erforderlich, um von Google bereitgestellte Vorlagen auszuführen.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt",
            "output": "gs://YOUR_BUCKET_NAME/output/my_output"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

Streamingjob mit benutzerdefinierter Vorlage erstellen

Durch die Beispielanfrage projects.locations.templates.launch wird ein Streaming-Job aus einer klassischen Vorlage erstellt, die aus einem Pub/Sub-Abo liest und in eine BigQuery-Tabelle schreibt. Wenn Sie eine flexible Vorlage starten möchten, verwenden Sie stattdessen projects.locations.flexTemplates.launch. Die Beispielvorlage ist eine von Google bereitgestellte Vorlage. Sie können den Pfad in der Vorlage so ändern, dass er auf eine benutzerdefinierte Vorlage verweist. Die gleiche Logik wird verwendet, um von Google bereitgestellte und benutzerdefinierte Vorlagen zu starten. In diesem Beispiel muss die BigQuery-Tabelle bereits mit dem entsprechenden Schema vorhanden sein. Bei Erfolg enthält der Antworttext eine Instanz von LaunchTemplateResponse.

Ändern Sie die folgenden Werte:

  • Ersetzen Sie YOUR_PROJECT_ID durch Ihre Projekt-ID.
  • Ersetzen Sie LOCATION durch die Dataflow-Region Ihrer Wahl.
  • Ersetzen Sie JOB_NAME durch einen Jobnamen Ihrer Wahl.
  • Ersetzen Sie YOUR_BUCKET_NAME durch den Namen Ihres Cloud Storage-Buckets.
  • Ersetzen Sie GCS_PATH durch den Cloud Storage-Speicherort der Vorlagendatei. Der Speicherort sollte mit gs:// beginnen.
  • Stellen Sie das Flag parameters auf Ihre Liste der Schlüssel/Wert-Paare ein. Die aufgeführten Parameter gelten speziell für dieses Vorlagenbeispiel. Wenn Sie eine benutzerdefinierte Vorlage verwenden, ändern Sie die Parameter nach Bedarf. Wenn Sie die Beispielvorlage verwenden, ersetzen Sie die folgenden Variablen.
    • Ersetzen Sie YOUR_SUBSCRIPTION_NAME durch den Namen des Pub/Sub-Abos.
    • Ersetzen Sie YOUR_DATASET durch Ihr BigQuery-Dataset und ersetzen Sie YOUR_TABLE_NAME durch Ihren BigQuery-Tabellennamen.
  • Legen Sie für tempLocation einen Speicherort fest, für den Sie die Schreibberechtigung haben. Dieser Wert ist erforderlich, um von Google bereitgestellte Vorlagen auszuführen.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
            "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

Streamingjob mit benutzerdefinierter Vorlage aktualisieren

Diese Beispielanfrage projects.locations.templates.launch zeigt Ihnen, wie Sie eine Vorlage für einen Streamingjob aktualisieren. Wenn Sie eine flexible Vorlage aktualisieren möchten, verwenden Sie stattdessen projects.locations.flexTemplates.launch.

  1. Führen Sie Beispiel 2: Streamingjob aus benutzerdefinierter Vorlage erstellen aus, um einen Streaming-Vorlagenjob zu starten.
  2. Senden Sie die folgende HTTP-POST-Anfrage mit den folgenden geänderten Werten:
    • Ersetzen Sie YOUR_PROJECT_ID durch Ihre Projekt-ID.
    • Ersetzen Sie LOCATION durch die Dataflow-Region des Jobs, den Sie aktualisieren.
    • Ersetzen Sie JOB_NAME durch den genauen Namen des Jobs, den Sie aktualisieren möchten.
    • Ersetzen Sie GCS_PATH durch den Cloud Storage-Speicherort der Vorlagendatei. Der Speicherort sollte mit gs:// beginnen.
    • Stellen Sie das Flag parameters auf Ihre Liste der Schlüssel/Wert-Paare ein. Die aufgeführten Parameter gelten speziell für dieses Vorlagenbeispiel. Wenn Sie eine benutzerdefinierte Vorlage verwenden, ändern Sie die Parameter nach Bedarf. Wenn Sie die Beispielvorlage verwenden, ersetzen Sie die folgenden Variablen.
      • Ersetzen Sie YOUR_SUBSCRIPTION_NAME durch den Namen des Pub/Sub-Abos.
      • Ersetzen Sie YOUR_DATASET durch Ihr BigQuery-Dataset und ersetzen Sie YOUR_TABLE_NAME durch Ihren BigQuery-Tabellennamen.
    • Mit dem Parameter environment können Sie Umgebungseinstellungen wie den Maschinentyp ändern. In diesem Beispiel wird der Maschinentyp n2-highmem-2 verwendet, der mehr Arbeitsspeicher und CPU pro Worker als der Standardmaschinentyp hat.
        POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
        {
            "jobName": "JOB_NAME",
            "parameters": {
                "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_TOPIC_NAME",
                "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
            },
            "environment": {
                "machineType": "n2-highmem-2"
            },
            "update": true
        }
    
  3. Rufen Sie die Dataflow-Monitoring-Oberfläche auf und prüfen Sie, ob ein neuer Job mit demselben Namen erstellt wurde. Dieser Job hat den Status Aktualisiert.

Google API-Clientbibliotheken verwenden

Mit Google API-Clientbibliotheken können Sie auf einfache Weise Dataflow REST APIs aufrufen. In diesem Beispielskript wird die Google API-Clientbibliothek für Python verwendet.

Geben Sie für dieses Beispiel die folgenden Variablen an:

  • project: Auf Ihre Projekt-ID festlegen.
  • job: Auf einen eindeutigen Jobnamen Ihrer Wahl festlegen.
  • template: Auf den Cloud Storage-Speicherort der Vorlagendatei festlegen.
  • parameters: Ein Wörterbuch mit den Vorlagenparametern festlegen.

Wenn Sie die Region festlegen möchten, fügen Sie den Parameter location ein.

from googleapiclient.discovery import build

# project = 'your-gcp-project'
# job = 'unique-job-name'
# template = 'gs://dataflow-templates/latest/Word_Count'
# parameters = {
#     'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
#     'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
# }

dataflow = build("dataflow", "v1b3")
request = (
    dataflow.projects()
    .templates()
    .launch(
        projectId=project,
        gcsPath=template,
        body={
            "jobName": job,
            "parameters": parameters,
        },
    )
)

response = request.execute()

Weitere Informationen zu den verfügbaren Optionen finden Sie in der Referenz zur Dataflow REST API unter der Methode projects.locations.templates.launch.

gcloud CLI verwenden

Mit der gcloud CLI kann eine benutzerdefinierte oder eine von Google bereitgestellte Vorlage mit dem Befehl gcloud dataflow jobs run ausgeführt werden. Beispiele für die Ausführung von Vorlagen, die von Google bereitgestellt wurden, sind auf der Seite Von Google bereitgestellte Vorlagen dokumentiert.

Legen Sie für die folgenden Beispiele zu benutzerdefinierten Vorlagen die Werte so fest:

  • Ersetzen Sie JOB_NAME durch einen Jobnamen Ihrer Wahl.
  • Ersetzen Sie YOUR_BUCKET_NAME durch den Namen Ihres Cloud Storage-Buckets.
  • Legen Sie für --gcs-location den Cloud Storage-Speicherort der Vorlagendatei fest.
  • Legen Sie für --parameters die durch Kommas abgetrennte Liste der Parameter fest, die an den Job übergeben werden sollen. Leerzeichen zwischen Kommas und Werten sind nicht zugelassen.
  • Wenn Sie verhindern möchten, dass VMs SSH-Schlüssel akzeptieren, die in Projektmetadaten gespeichert sind, verwenden Sie das Flag additional-experiments mit der Dienstoption block_project_ssh_keys: --additional-experiments=block_project_ssh_keys

Batchjob mit benutzerdefinierter Vorlage erstellen

In diesem Beispiel wird aus einer Vorlage ein Batchjob erstellt, der eine Textdatei einliest und eine Ausgabetextdatei schreibt.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,output=gs://YOUR_BUCKET_NAME/output/my_output

Die Anfrage gibt eine Antwort im folgenden Format zurück.

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_BATCH

Streamingjob mit benutzerdefinierter Vorlage erstellen

In diesem Beispiel wird aus einer Vorlage ein Streamingjob erstellt, der aus einem Pub/Sub-Thema liest und in eine BigQuery-Tabelle schreibt. Die BigQuery-Tabelle muss bereits mit einem geeigneten Schema vorhanden sein.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name

Die Anfrage gibt eine Antwort im folgenden Format zurück.

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_STREAMING

Eine vollständige Liste der Flags für den Befehl gcloud dataflow jobs run finden Sie in der Referenz für die gcloud CLI.

Überwachung und Fehlerbehebung

Mit der Dataflow-Monitoring-Oberfläche können Sie Ihre Dataflow-Jobs überwachen. Schlägt ein Job fehl, finden Sie im Leitfaden Pipelinefehler beheben Tipps und Strategien zur Fehlerbehebung sowie einen Katalog mit häufig auftretenden Fehlern.