Von Pub/Sub zu BigQuery streamen


In dieser Anleitung wird die Vorlage Pub/Sub-Abo zu BigQuery verwendet, um einen Dataflow-Vorlagenjob mithilfe der Google Cloud Console oder der Google Cloud CLI zu erstellen und auszuführen. Die Anleitung führt Sie durch ein Streaming-Pipeline-Beispiel, das JSON-codierte Nachrichten aus Pub/Sub liest, eine benutzerdefinierte Funktion (User-Defined Function, UDF) verwendet, um die von Google bereitgestellte Streamingvorlage zu erweitern, Nachrichtendaten mit dem Apache Beam SDK transformiert und die Ergebnisse in eine BigQuery-Tabelle schreibt.

Pipelines für Streaminganalyse und Datenintegration verwenden Pub/Sub zum Aufnehmen und Verteilen von Daten. Mit Pub/Sub können Sie Systeme für Ereignisersteller und -nutzer erstellen, die als Publisher und Abonnenten bezeichnet werden. Publisher senden Ereignisse asynchron an den Pub/Sub-Dienst und Pub/Sub liefert die Ereignisse dann an alle Dienste, die darauf reagieren müssen.

Dataflow ist ein vollständig verwalteter Dienst zum Transformieren und Anreichern von Daten im Streammodus (Echtzeit) und im Batchmodus. Es bietet eine vereinfachte Pipeline-Entwicklungsumgebung, die das Apache Beam SDK verwendet, um eingehende Daten zu transformieren und dann die transformierten Daten auszugeben.

Der Vorteil dieses Workflows besteht darin, dass Sie die Nachrichtendaten mithilfe von UDFs transformieren können, bevor sie in BigQuery geschrieben werden. Eine weitere Option ist die Verwendung eines BigQuery-Abos, das Pub/Sub-Nachrichten direkt ohne Nutzung von Dataflow in BigQuery schreibt. Diese Option unterstützt nur die Zustellung mindestens einmal, nicht die genau einmalige Verarbeitung.

Ziele

  • Erstellen Sie ein Pub/Sub-Thema.
  • Erstellen Sie ein BigQuery-Dataset mit einer Tabelle und einem Schema.
  • Verwenden Sie eine von Google bereitgestellte Streamingvorlage, um Daten mit Dataflow aus Ihrem Pub/Sub-Abo zu BigQuery zu streamen.
  • Benutzerdefinierte Funktion (UDF, User-Defined Function) erstellen, um die von Google bereitgestellte Streaming-Vorlage zu erweitern.

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • BigQuery

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Hinweise

In diesem Abschnitt erfahren Sie, wie Sie ein Projekt auswählen, APIs aktivieren und Ihrem Nutzerkonto und dem Arbeitskonto die entsprechenden Rollen zuweisen.

Console

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager, and Cloud Scheduler APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager, and Cloud Scheduler APIs.

    Enable the APIs

  8. Damit Sie die Schritte in dieser Anleitung ausführen können, muss Ihr Nutzerkonto die Rolle Dienstkontonutzer haben. Das Compute Engine-Standarddienstkonto muss die folgenden Rollen haben: Dataflow Worker, Dataflow Admin, Pub/Sub Editor, Storage Object Admin, and BigQuery Data Editor. So fügen Sie die erforderlichen Rollen in der Google Cloud Console hinzu:

    1. Öffnen Sie in der Google Cloud Console die Seite IAM.

      IAM aufrufen
    2. Wählen Sie Ihr Projekt aus.
    3. Klicken Sie in der Zeile mit Ihrem Nutzerkonto auf Hauptkonto bearbeiten und dann auf Weitere Rolle hinzufügen.
    4. Wählen Sie in der Drop-down-Liste die Rolle Service Account User aus.
    5. Klicken Sie in der Zeile mit dem Compute Engine-Standarddienstkonto auf Hauptkonto bearbeiten und dann auf Weitere Rolle hinzufügen.
    6. Wählen Sie aus der Drop-down-Liste die Rolle Dataflow-Worker aus.
    7. Wiederholen Sie den Vorgang für die Rollen Dataflow-Administrator, Pub/Sub-Bearbeiter, Storage-Objekt-Administrator und BigQuery-Dateneditor und klicken Sie dann auf Speichern.

      Weitere Informationen zum Zuweisen von Rollen finden Sie unter IAM-Rolle über die Konsole zuweisen.

gcloud

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com
  7. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com
  14. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Weisen Sie Ihrem Compute Engine-Standarddienstkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Projekt-ID.
    • PROJECT_NUMBER: Ihre Projektnummer. Verwenden Sie den Befehl gcloud projects describe, um Ihre Projektnummer zu ermitteln.
    • SERVICE_ACCOUNT_ROLE: Jede einzelne Rolle.

Beispielquelle und -senke erstellen

In diesem Abschnitt wird Folgendes erstellt:

  • Streaming-Datenquelle mit Pub/Sub
  • Dataset, um Daten in BigQuery zu laden

Cloud Storage-Bucket erstellen

Erstellen Sie zuerst einen Cloud Storage-Bucket mit der Cloud Console oder der Google Cloud CLI. Die Dataflow-Pipeline verwendet diesen Bucket als temporären Speicherort.

Console

  1. Wechseln Sie in der Google Cloud Console zur Cloud Storage-Seite Buckets.

    Buckets aufrufen

  2. Klicken Sie auf Bucket erstellen.

  3. Geben Sie auf der Seite Bucket erstellen bei Bucket benennen einen Namen ein, der die Anforderungen für Bucket-Namen erfüllt. Cloud Storage-Bucket-Namen müssen global einmalig sein. Wählen Sie die anderen Optionen nicht aus.

  4. Klicken Sie auf Erstellen.

gcloud

Führen Sie den Befehl gcloud storage buckets create aus:

gcloud storage buckets create gs://BUCKET_NAME

Ersetzen Sie BUCKET_NAME durch einen Namen für den Cloud Storage-Bucket, der den Anforderungen für Bucket-Namen entspricht. Cloud Storage-Bucket-Namen müssen global einmalig sein.

Pub/Sub-Thema und -Abo erstellen

Erstellen Sie ein Pub/Sub-Thema und dann ein Abo für dieses Thema.

Console

So erstellen Sie ein Thema:

  1. Öffnen Sie in der Google Cloud Console die Pub/Sub-Seite Themen.

    Themen aufrufen

  2. Klicken Sie auf Thema erstellen.

  3. Geben Sie im Feld Themen-ID eine ID für das Thema ein. Weitere Informationen zum Benennen eines Themas finden Sie unter Richtlinien für die Benennung eines Themas oder eines Abos.

  4. Behalten Sie die Option Standardabo hinzufügen bei. Wählen Sie die anderen Optionen nicht aus.

  5. Klicken Sie auf Thema erstellen.

gcloud

Führen Sie den Befehl gcloud pubsub topics create aus, um ein Thema zu erstellen. Informationen zum Benennen eines Abos finden Sie unter Richtlinien für die Benennung eines Themas oder Abos.

gcloud pubsub topics create TOPIC_ID

Ersetzen Sie TOPIC_ID durch einen Namen für Ihr Pub/Sub-Thema.

Führen Sie den Befehl gcloud pubsub subscriptions create aus, um ein Abo für Ihr Thema zu erstellen:

gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID

Ersetzen Sie SUBSCRIPTION_ID durch einen Namen für Ihr Pub/Sub-Abo.

Cloud Scheduler-Jobs erstellen und ausführen

Erstellen und führen Sie zwei Cloud Scheduler-Jobs aus: einen, der positive Bewertungen veröffentlicht, und einen zweiten, der negative Bewertungen in Ihrem Pub/Sub-Thema veröffentlicht.

Console

Erstellen Sie einen Cloud Scheduler-Job für positive Bewertungen.

  1. Gehen Sie in der Konsole zur Seite Cloud Scheduler:

    Zu Cloud Scheduler

  2. Klicken Sie auf die Schaltfläche Job erstellen.

  3. Geben Sie den Namen positive-ratings-publisher ein.

  4. Wählen Sie eine Dataflow-Region in der Nähe des Standorts, an dem die Befehle in dieser Anleitung ausgeführt werden. Der Wert der Variablen REGION muss ein gültiger Regionsname sein. Weitere Informationen zu Regionen und Standorten finden Sie unter Dataflow-Standorte.

  5. Geben Sie die Häufigkeit der Ausführung des Jobs im Format unix-cron an: * * * * *.

    Weitere Informationen finden Sie unter Zeitpläne für Cronjobs konfigurieren.

  6. Wählen Sie Ihre Zeitzone aus.

  7. Klicken Sie auf Weiter.

  8. Wählen Sie in der Zielliste Pub/Sub aus.

  9. Wählen Sie den Namen Ihres Themas aus der Liste aus.

  10. Fügen Sie den folgenden Nachrichtenstring hinzu, der an Ihr Ziel gesendet werden soll:{"url": "https://beam.apache.org/", "review": "positive"}

  11. Klicken Sie auf Erstellen.

Sie haben jetzt einen Cronjob, der jede Minute eine Nachricht mit einer positiven Bewertung an Ihr Pub/Sub-Thema sendet. Ihre Cloud Functions-Funktion abonniert dieses Thema.

Erstellen Sie einen Cloud Scheduler-Job für negative Bewertungen.

  1. Klicken Sie in der Console auf der Seite Cloud Scheduler auf die Schaltfläche Job erstellen.

  2. Geben Sie den Namen negative-ratings-publisher ein.

  3. Wählen Sie eine Region für die Ausführung des Jobs aus.

  4. Geben Sie die Häufigkeit der Ausführung des Jobs im Format unix-cron an: */2 * * * *.

    Weitere Informationen finden Sie unter Zeitpläne für Cronjobs konfigurieren.

  5. Wählen Sie Ihre Zeitzone aus.

  6. Klicken Sie auf Weiter.

  7. Wählen Sie in der Zielliste Pub/Sub aus.

  8. Wählen Sie den Namen Ihres Themas aus der Liste aus.

  9. Fügen Sie den folgenden Nachrichtenstring hinzu, der an Ihr Ziel gesendet werden soll:{"url": "https://beam.apache.org/", "review": "negative"}

  10. Klicken Sie auf Erstellen.

Sie haben jetzt einen Cronjob, der alle zwei Minuten eine Nachricht mit einer negativen Bewertung an Ihr Pub/Sub-Thema sendet. Ihre Cloud Functions-Funktion abonniert dieses Thema.

gcloud

  1. Verwenden Sie den Befehl gcloud scheduler jobs create, um einen Cloud Scheduler-Job für diese Anleitung zu erstellen. Mit diesem Schritt wird ein Publisher für "positive Bewertungen" erstellt, der eine Nachricht pro Minute veröffentlicht.

    gcloud scheduler jobs create pubsub positive-ratings-publisher \
      --schedule="* * * * *" \
      --location=DATAFLOW_REGION \
      --topic="TOPIC_ID" \
      --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
    

    Ersetzen Sie DATAFLOW_REGION durch die Region, in der der Dataflow-Job bereitgestellt werden soll. Wählen Sie eine Dataflow-Region in der Nähe des Standorts, an dem die Befehle in dieser Anleitung ausgeführt werden. Der Wert der Variablen REGION muss ein gültiger Regionsname sein.

  2. Verwenden Sie den Befehl gcloud scheduler jobs run, um den Cloud Scheduler-Job zu starten.

    gcloud scheduler jobs run --location=DATAFLOW_REGION positive-ratings-publisher
    
  3. Erstellen und führen Sie einen weiteren ähnlichen Publisher für "negative Bewertungen" aus, der alle zwei Minuten eine Nachricht veröffentlicht. Mit diesem Schritt wird ein Publisher für "negative Bewertungen" erstellt, der alle zwei Minuten eine Nachricht veröffentlicht.

    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --location=DATAFLOW_REGION  \
      --topic="TOPIC_ID" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
  4. Starten Sie den zweiten Cloud Scheduler-Job.

    gcloud scheduler jobs run --location=DATAFLOW_REGION negative-ratings-publisher
    

BigQuery-Dataset erstellen

Erstellen Sie ein BigQuery-Dataset und eine Tabelle mit dem entsprechenden Schema für Ihr Pub/Sub-Thema.

Console

BigQuery-Dataset erstellen

  1. Öffnen Sie in der Google Cloud Console die Seite "BigQuery".

    Zur Seite "BigQuery"

  2. Wählen Sie im Bereich Explorer das Projekt aus, in dem Sie das Dataset erstellen möchten.

  3. Maximieren Sie die Option Aktionen und klicken Sie auf Dataset erstellen.

  4. Führen Sie auf der Seite Dataset erstellen die folgenden Schritte aus:

    • Geben Sie unter Dataset-ID tutorial_dataset ein.
    • Unter Speicherort der Daten können Sie einen geografischen Standort für das Dataset auswählen. Nach der Erstellung des Datasets kann der Standort nicht mehr geändert werden.

    • Wählen Sie die anderen Optionen nicht aus.

    • Klicken Sie auf Dataset erstellen.

Erstellen Sie eine BigQuery-Tabelle mit einem Schema.

  1. Maximieren Sie im Bereich Explorer Ihr Projekt und wählen Sie das Dataset tutorial_dataset aus.

  2. Maximieren Sie die Option Aktionen und klicken Sie auf Öffnen.

  3. Klicken Sie im Detailfeld auf Tabelle erstellen.

  4. Wählen Sie auf der Seite Create table (Tabelle erstellen) im Abschnitt Source (Quelle) die Option Empty table (Leere Tabelle) aus.

  5. Gehen Sie auf der Seite Tabelle erstellen im Abschnitt Ziel folgendermaßen vor:

    • Prüfen Sie, ob für Dataset-Name der Wert tutorial_dataset festgelegt ist.
    • Geben Sie im Feld Tabellenname tutorial ein.
    • Achten Sie darauf, dass der Tabellentyp auf Native Tabelle eingestellt ist.
  6. Geben Sie im Abschnitt Schema die Schemadefinition ein. Klicken Sie auf Als Text bearbeiten und geben Sie das folgende Tabellenschema als JSON-Array ein.

    [
      {
        "mode": "NULLABLE",
        "name": "url",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "review",
        "type": "STRING"
      }
    ]
    
  7. Behalten Sie für Partitions- und Clustereinstellungen den Standardwert No partitioning bei.

  8. Übernehmen Sie im Abschnitt Erweiterte Optionen ebenfalls den Standardwert für Verschlüsselung, und zwar Google-managed key. Dataflow verschlüsselt standardmäßig ruhende Kundendaten.

  9. Klicken Sie auf Tabelle erstellen.

gcloud

Erstellen Sie das Dataset mit dem Befehl bq mk.

bq --location=DATAFLOW_REGION mk \
PROJECT_ID:tutorial_dataset

Ersetzen Sie dabei PROJECT_ID durch die Projekt-ID Ihres Zielprojekts.

Verwenden Sie den Befehl bq mk mit dem Flag --table oder -t, um eine Tabelle in Ihrem Dataset zu erstellen.

bq mk \
    --table \
    PROJECT_ID:tutorial_dataset.tutorial \
    url:STRING,review:STRING

Benutzerdefinierte Funktion (UDF) erstellen

Optional können Sie eine JavaScript-UDF erstellen, um die von Google bereitgestellte Vorlage „Pub/Sub-Abo für BigQuery“ zu erweitern. Mit UDFs können Sie Datentransformationen definieren, die nicht in der Vorlage vorhanden sind, und sie in die Vorlage einschleusen.

Die folgende UDF validiert die URLs der eingehenden Bewertungen. Bewertungen ohne URLs oder falschen URLs werden an eine andere Ausgabetabelle mit dem Suffix _error_records, auch als Tabelle für unzustellbare Nachrichten bezeichnet, im selben Projekt und Dataset weitergeleitet.

JavaScript

/**
 * User-defined function (UDF) to transform events
 * as part of a Dataflow template job.
 *
 * @param {string} inJson input Pub/Sub JSON message (stringified)
 */
 function process(inJson) {
    const obj = JSON.parse(inJson);
    const includePubsubMessage = obj.data && obj.attributes;
    const data = includePubsubMessage ? obj.data : obj;

    if (!data.hasOwnProperty('url')) {
      throw new Error("No url found");
    } else if (data.url !== "https://beam.apache.org/") {
      throw new Error("Unrecognized url");
    }

    return JSON.stringify(obj);
  }

Speichern Sie dieses JavaScript-Snippet in dem zuvor erstellten Cloud Storage-Bucket.

Pipeline ausführen

Führen Sie eine Streaming-Pipeline mit der von Google bereitgestellten Vorlage Pub/Sub-Abo zu BigQuery aus. Die Pipeline erhält eingehende Daten aus dem Pub/Sub-Thema und gibt die Daten in Ihr BigQuery-Dataset aus.

Console

  1. Rufen Sie in der Google Cloud Console die Dataflow-Seite Jobs auf.

    ZU JOBS

  2. Klicken Sie auf Job aus Vorlage erstellen.

  3. Geben Sie einen Jobnamen für Ihren Dataflow-Job ein.

  4. Wählen Sie unter Regionaler Endpunkt eine Region für Ihren Dataflow-Job aus.

  5. Wählen Sie für die Dataflow-Vorlage die Vorlage Pub/Sub-Abo für BigQuery aus.

  6. Geben Sie für BigQuery-Ausgabetabelle Folgendes ein:

    PROJECT_ID:tutorial_dataset.tutorial
    
  7. Geben Sie unter Pub/Sub-Eingabeabo Folgendes ein:

    projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
    

    Ersetzen Sie PROJECT_ID durch die Projekt-ID des Projekts, in dem Sie das BigQuery-Dataset erstellt haben, und SUBSCRIPTION_ID durch den Namen Ihres Pub/Sub-Abos.

  8. Geben Sie unter Temporärer Speicherort Folgendes ein:

    gs://BUCKET_NAME/temp/
    

    Ersetzen Sie BUCKET_NAME durch den Namen Ihres Cloud Storage-Buckets. Der Ordner temp speichert temporäre Dateien, z. B. den bereitgestellten Pipelinejob.

  9. Optional: Maximieren Sie Optionale Parameter, um eine UDF für den Job einzufügen.

    1. Geben Sie unter JavaScript-UDF-Pfad in Cloud Storage Folgendes ein:

      gs://BUCKET_NAME/dataflow_udf_transform.js
      
    2. Geben Sie für JavaScript-UDF-Name Folgendes ein:

      process
      
  10. Klicken Sie auf Job ausführen.

Wenn Sie prüfen möchten, ob die Vorlage Nachrichten an eine Tabelle für unzustellbare Nachrichten weiterleiten kann, veröffentlichen Sie einige Bewertungen ohne URLs oder falsche URLs.

  1. Zur Seite Pub/Sub-Themen

  2. Klicken Sie auf Ihren TOPIC_ID.

  3. Rufen Sie den Bereich Nachrichten auf.

  4. Klicken Sie auf Nachricht veröffentlichen.

  5. Geben Sie im Nachrichtentext einige Bewertungen ohne URLs oder falsche URLs ein. Beispiel:

    {"url": "https://beam.apache.org/documentation/sdks/java/", "review": "positive"}
    
  6. Klicken Sie auf Veröffentlichen.

gcloud

Führen Sie die Vorlage in Ihrer Shell oder Ihrem Terminal mit dem Befehl gcloud dataflow jobs run aus.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
    --region DATAFLOW_REGION \
    --staging-location gs://BUCKET_NAME/temp \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:tutorial_dataset.tutorial

Ersetzen Sie JOB_NAME durch einen eindeutigen Namen Ihrer Wahl.

Optional können Sie die Vorlage mit der UDF mit dem folgenden Befehl ausführen:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
    --region DATAFLOW_REGION \
    --staging-location gs://BUCKET_NAME/temp \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:tutorial_dataset.tutorial,\
javascriptTextTransformGcsPath=gs://BUCKET_NAME/dataflow_udf_transform.js,\
javascriptTextTransformFunctionName=process

Wenn Sie prüfen möchten, ob die Vorlage Nachrichten an eine Tabelle für unzustellbare Nachrichten weiterleiten kann, veröffentlichen Sie einige Bewertungen ohne URLs oder falsche URLs. Zum Beispiel:

gcloud pubsub topics publish TOPIC_ID \
  --message='{"url": "https://beam.apache.org/documentation/sdks/java/", "review": "positive"}'

Ergebnisse ansehen

Sehen Sie sich die Daten an, die in Ihren BigQuery-Tabellen geschrieben wurden.

Console

  1. Öffnen Sie in der Google Cloud Console die Seite BigQuery.
    Zur Seite „BigQuery“

  2. Führen Sie im Abfrageeditor folgende Abfrage aus:

    SELECT * FROM `PROJECT_ID.tutorial_dataset.tutorial`
    LIMIT 1000
    

    Es kann bis zu einer Minute dauern, bis Daten in der Tabelle angezeigt werden.

    Die Abfrage gibt Zeilen zurück, die in den letzten 24 Stunden zu Ihrer Tabelle hinzugefügt wurden. Sie können Abfragen auch mit Standard-SQL ausführen.

    Wenn Sie davon ausgehen, dass einige Fehlerdatensätze in die Tabelle für unzustellbare Nachrichten geschrieben werden, verwenden Sie in der Abfrage den Tabellennamen tutorial_error_records. Beispiele:

    SELECT * FROM `PROJECT_ID.tutorial_dataset.tutorial_error_records`
    LIMIT 1000
    

gcloud

Prüfen Sie die Ergebnisse in BigQuery mit der folgenden Abfrage:

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.tutorial_dataset.tutorial"'`'

Während diese Pipeline ausgeführt wird, können Sie jede Minute neue Zeilen an die BigQuery-Tabelle anhängen.

Wenn Sie davon ausgehen, dass einige Fehlerdatensätze in die Tabelle für unzustellbare Nachrichten geschrieben werden, verwenden Sie in der Abfrage den Tabellennamen tutorial_error_records. Beispiele:

SELECT * FROM `PROJECT_ID.tutorial_dataset.tutorial_error_records`
LIMIT 1000

Bereinigen

Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.

Projekt löschen

Am einfachsten können Sie weitere Kosten vermeiden, wenn Sie das Google Cloud-Projekt löschen, das Sie für die Anleitung erstellt haben.

Console

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

gcloud

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Einzelne Ressourcen löschen

Wenn Sie das Projekt später wiederverwenden möchten, können Sie es behalten, aber die während der Anleitung erstellten Ressourcen löschen.

Dataflow-Pipeline anhalten

Console

  1. Rufen Sie in der Google Cloud Console die Dataflow-Seite Jobs auf.

    ZU JOBS

  2. Klicken Sie auf den Job, den Sie beenden möchten.

    Zum Beeinden eines Jobs muss der Status des Jobs Wird ausgeführt sein.

  3. Klicken Sie auf der Seite mit den Jobdetails auf Beenden.

  4. Klicken Sie auf Abbrechen.

  5. Klicken Sie auf Job anhalten, um die Auswahl zu bestätigen.

gcloud

Verwenden Sie den Befehl gcloud dataflow jobs, um Ihren Dataflow-Job abzubrechen.

gcloud dataflow jobs list \
  --filter 'NAME=JOB_NAME AND STATE=Running' \
  --format 'value(JOB_ID)' \
  --region "DATAFLOW_REGION" \
  | xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"

Google Cloud-Projektressourcen bereinigen

Console

  1. Löschen Sie die Cloud Scheduler-Jobs.

    1. Rufen Sie in der Google Cloud Console die Seite Cloud Scheduler auf.

      Zu Cloud Scheduler

    2. Wählen Sie Ihre Jobs aus.

    3. Klicken Sie oben auf der Seite auf Löschen und bestätigen Sie den Löschvorgang.

  2. Löschen Sie das Pub/Sub-Thema und -Abo.

    1. Rufen Sie in der Google Cloud Console die Seite Pub/Sub-Themen auf.

      Themen aufrufen

    2. Wählen Sie das von Ihnen erstellte Thema aus.

    3. Klicken Sie auf Löschen, um das Thema endgültig zu löschen.

    4. Rufen Sie in der Google Cloud Console die Seite Pub/Sub-Abos auf.

      Zu den Abos

    5. Wählen Sie das mit Ihrem Thema erstellte Abo aus.

    6. Klicken Sie auf Löschen, um das Abo endgültig zu löschen.

  3. Löschen Sie die BigQuery-Tabelle und das Dataset.

    1. Öffnen Sie in der Google Cloud Console die Seite BigQuery.

      BigQuery aufrufen

    2. Maximieren Sie im Bereich Explorer Ihr Projekt.

    3. Klicken Sie neben dem Dataset, das Sie löschen möchten, auf  Aktionen ansehen und dann auf Löschen.

  4. Löschen Sie den Cloud Storage-Bucket.

    1. Wechseln Sie in der Google Cloud Console zur Cloud Storage-Seite Buckets.

      Buckets aufrufen

    2. Wählen Sie den Bucket aus, den Sie löschen möchten, klicken Sie auf Löschen und folgen Sie der Anleitung.

gcloud

  1. Verwenden Sie den Befehl gcloud scheduler jobs delete, um die Cloud Scheduler-Jobs zu löschen.

    gcloud scheduler jobs delete negative-ratings-publisher --location=DATAFLOW_REGION
    
    gcloud scheduler jobs delete positive-ratings-publisher --location=DATAFLOW_REGION
    
  2. Verwenden Sie die Befehle gcloud pubsub subscriptions delete und gcloud pubsub topics delete, um das Pub/Sub-Abo und -Thema zu löschen.

    gcloud pubsub subscriptions delete SUBSCRIPTION_ID
    gcloud pubsub topics delete TOPIC_ID
    
  3. Verwenden Sie zum Löschen der BigQuery-Tabelle den Befehl bq rm.

    bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
    
  4. Löschen Sie das BigQuery-Dataset. Das Dataset allein verursacht keine Gebühren.

    bq rm -r -f -d PROJECT_ID:tutorial_dataset
    
  5. Verwenden Sie den Befehl gcloud storage rm, um den Cloud Storage-Bucket zu löschen. Für den Bucket fallen keine Gebühren an.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Anmeldedaten entfernen

Console

Wenn Sie Ihr Projekt beibehalten, widerrufen Sie die Rollen, die Sie dem Compute Engine-Standarddienstkonto zugewiesen haben.

  1. Öffnen Sie in der Google Cloud Console die Seite IAM.

IAM aufrufen

  1. Wählen Sie ein Projekt, einen Ordner oder eine Organisation aus.

  2. Suchen Sie die Zeile mit dem Hauptkonto, dessen Zugriff Sie widerrufen möchten. Klicken Sie in dieser Zeile auf Hauptkonto bearbeiten.

  3. Klicken Sie für alle Rollen, die entzogen werden sollen, auf Löschen  und dann auf Speichern.

gcloud

  • Wenn Sie Ihr Projekt beibehalten, widerrufen Sie die Rollen, die Sie dem Compute Engine-Standarddienstkonto zugewiesen haben. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:
    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
      gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \
      --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \
      --role=<var>ROLE</var>
    

  • Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  • Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Nächste Schritte