Streaming-Pipeline mit einer Dataflow-Vorlage erstellen

In dieser Kurzanleitung erfahren Sie, wie Sie anhand einer von Google bereitgestellten Dataflow-Vorlage eine Streaming-Pipeline erstellen. Dabei wird speziell auf die Vorlage Pub/Sub für BigQuery Bezug genommen.

Die Vorlage „Pub/Sub für BigQuery“ ist eine Streamingpipeline, die Nachrichten im JSON-Format aus einem Pub/Sub-Thema lesen und in eine BigQuery-Tabelle schreiben kann.


Klicken Sie auf Anleitung, um eine detaillierte Anleitung für diese Aufgabe direkt in der Google Cloud Console aufzurufen.

Anleitung


Vorbereitung

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Enable the APIs

  8. Erstellen Sie einen Cloud Storage-Bucket:
    1. In the Google Cloud console, go to the Cloud Storage Buckets page.

      Go to Buckets page

    2. Click Create bucket.
    3. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
      • For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
      • For Choose where to store your data, do the following:
        • Select a Location type option.
        • Select a Location option.
      • For Choose a default storage class for your data, select the following: Standard.
      • For Choose how to control access to objects, select an Access control option.
      • For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
    4. Click Create.
  9. Kopieren Sie Folgendes, was Sie in einem späteren Abschnitt benötigen:
    • : Name Ihres Cloud Storage-Buckets
    • Ihre Google Cloud-Projekt-ID.

      Diese ID finden Sie über Projekte identifizieren.
  10. Damit Sie die Schritte in dieser Kurzanleitung ausführen können, muss Ihr Nutzerkonto die Rollen Dataflow-Administrator und Dienstkontonutzer haben. Das Compute Engine-Standarddienstkonto muss die Rolle "Dataflow-Worker" haben. So fügen Sie die erforderlichen Rollen in der Google Cloud Console hinzu:

    1. Rufen Sie die IAM-Seite auf.
      Zu IAM
    2. Wählen Sie Ihr Projekt aus.
    3. Klicken Sie in der Zeile mit Ihrem Nutzerkonto auf Hauptkonto bearbeiten und dann auf Weitere Rolle hinzufügen.
    4. Klicken Sie auf Weitere Rolle hinzufügen und wählen Sie in der Drop-down-Liste Dataflow-Administrator aus.
    5. Klicken Sie auf Weitere Rolle hinzufügen und wählen Sie in der Drop-down-Liste Dienstkontonutzer aus.
    6. Klicken Sie auf Speichern.
    7. Klicken Sie in der Zeile mit dem Compute Engine-Standarddienstkonto auf Hauptkonto bearbeiten.
    8. Klicken Sie auf Weitere Rolle hinzufügen und wählen Sie in der Drop-down-Liste Dataflow-Worker aus.
    9. Klicken Sie auf Weitere Rolle hinzufügen und wählen Sie in der Drop-down-Liste die Option Pub/Sub Editor aus.
    10. Klicken Sie auf Weitere Rolle hinzufügen und wählen Sie im Drop-down-Menü BigQuery-Datenbearbeiter aus.
    11. Klicken Sie auf Speichern.

      Weitere Informationen zum Zuweisen von Rollen finden Sie unter IAM-Rolle über die Konsole zuweisen.

  11. Standardmäßig beginnt jedes neue Projekt mit einem Standardnetzwerk. Wenn das Standardnetzwerk für Ihr Projekt deaktiviert oder gelöscht wurde, benötigen Sie in Ihrem Projekt ein Netzwerk, für das Ihr Nutzerkonto die Rolle Compute-Netzwerknutzer hat (roles/compute.networkUser).

BigQuery-Dataset und -Tabelle erstellen

Erstellen Sie in der Google Cloud Console ein BigQuery-Dataset und eine BigQuery-Tabelle mit dem entsprechenden Schema für das Pub/Sub-Thema.

In diesem Beispiel lautet der Name des Datasets taxirides und der Name der Tabelle realtime. So erstellen Sie dieses Dataset und diese Tabelle:

  1. Rufen Sie die Seite BigQuery auf.
    BigQuery aufrufen
  2. Klicken Sie im Steuerfeld Explorer neben dem Projekt, in dem Sie das Dataset erstellen möchten, auf Aktionen anzeigen und dann auf Dataset erstellen.
  3. Führen Sie im Bereich Dataset erstellen die folgenden Schritte aus:
    1. Geben Sie unter Dataset-ID taxirides ein. Dataset-IDs sind für jedes Google Cloud-Projekt eindeutig.
    2. Wählen Sie als Standorttyp die Option Mehrere Regionen und dann USA (mehrere Regionen in den USA) aus. Öffentliche Datasets sind am multiregionalen Standort US gespeichert. Der Einfachheit halber sollten Sie Ihr Dataset an diesem Speicherort ablegen.
    3. Übernehmen Sie die anderen Standardeinstellungen und klicken Sie auf Dataset erstellen.
  4. Maximieren Sie im Bereich Explorer Ihr Projekt.
  5. Klicken Sie neben dem Dataset taxirides auf Aktionen anzeigen und dann auf Tabelle erstellen.
  6. Führen Sie im Bereich Tabelle erstellen die folgenden Schritte aus:
    1. Wählen Sie im Abschnitt Quelle unter Tabelle erstellen aus die Option Leere Tabelle aus.
    2. Geben Sie im Abschnitt Ziel unter Tabelle den Wert realtime ein.
    3. Klicken Sie im Abschnitt Schema auf das Optionsfeld Als Text bearbeiten und fügen Sie die folgende Schemadefinition in das Feld ein:
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. Wählen Sie unter Partitionierung und Clustereinstellungen für Partitionierung das Feld Zeitstempel aus.
  7. Übernehmen Sie die anderen Standardeinstellungen und klicken Sie auf Tabelle erstellen.

Pipeline ausführen

Führen Sie eine Streamingpipeline mit der von Google bereitgestellten Vorlage „Cloud Pub/Sub für BigQuery“ aus. Die Pipeline erhält eingehende Daten aus dem Eingabethema.

  1. Rufen Sie die Dataflow-Seite Jobs auf.
    ZU JOBS
  2. Klicken Sie auf Job aus Vorlage erstellen.
  3. Geben Sie taxi-data als Jobname für Ihren Dataflow-Job ein.
  4. Wählen Sie für die Dataflow-Vorlage die Vorlage Pub/Sub für BigQuery aus.
  5. Geben Sie für BigQuery-Ausgabetabelle Folgendes ein:
    PROJECT_ID:taxirides.realtime

    Ersetzen Sie dabei PROJECT_ID durch die Projekt-ID des Projekts, in dem Sie das BigQuery-Dataset erstellt haben.

  6. Maximieren Sie Optionale Parameter.
  7. Klicken Sie unter Pub/Sub-Eingabethema auf Thema manuell eingeben.
  8. Geben Sie im Dialog unter Themenname Folgendes ein und klicken Sie dann auf Speichern:
    projects/pubsub-public-data/topics/taxirides-realtime

    Dieses öffentlich verfügbare Pub/Sub-Thema basiert auf dem offenen Dataset der NYC Taxi & Limousine Commission. Im Folgenden finden Sie eine Beispielnachricht aus diesem Thema im JSON-Format:

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  9. Geben Sie unter Temporärer Standort Folgendes ein:
    gs://BUCKET_NAME/temp/

    Ersetzen Sie BUCKET_NAME durch den Namen des Cloud Storage-Buckets. Der Ordner temp speichert temporäre Dateien, z. B. den bereitgestellten Pipelinejob.

  10. Geben Sie ein Netzwerk und ein Subnetzwerk ein, wenn Ihr Projekt kein Standardnetzwerk hat. Weitere Informationen finden Sie unter Netzwerk und Subnetzwerk angeben.
  11. Klicken Sie auf Job ausführen.

Ergebnisse ansehen

So können Sie sich die in die Tabelle realtime geschriebenen Daten ansehen:

  1. Rufen Sie die Seite BigQuery auf.

    BigQuery aufrufen

  2. Klicken Sie auf Neue Abfrage erstellen. Ein neuer Tab Editor wird geöffnet.

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    Ersetzen Sie dabei PROJECT_ID durch die Projekt-ID des Projekts, in dem Sie das BigQuery-Dataset erstellt haben. Es kann bis zu einer Minute dauern, bis Daten in der Tabelle angezeigt werden.

  3. Klicken Sie auf Ausführen.

    Die Abfrage gibt Zeilen zurück, die in den letzten 24 Stunden zu Ihrer Tabelle hinzugefügt wurden. Sie können Abfragen auch mit Standard-SQL ausführen.

Bereinigen

Mit den folgenden Schritten vermeiden Sie, dass Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen in Rechnung gestellt werden:

Projekt löschen

Am einfachsten können Sie weitere Kosten vermeiden, wenn Sie das Google Cloud-Projekt löschen, das Sie für den Schnellstart erstellt haben.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Einzelne Ressourcen löschen

Wenn Sie das in dieser Kurzanleitung verwendete Google Cloud-Projekt beibehalten möchten, löschen Sie die einzelnen Ressourcen:

  1. Rufen Sie die Dataflow-Seite Jobs auf.
    Gehe zu Jobs
  2. Wählen Sie den Streaming-Job aus der Jobliste aus.
  3. Klicken Sie im Navigationsbereich auf Beenden.
  4. Geben Sie im Dialogfeld Job anhalten entweder Pipeline abbrechen oder per Drain beenden ein und klicken Sie dann auf Job beenden.
  5. Rufen Sie die Seite BigQuery auf.
    BigQuery aufrufen
  6. Maximieren Sie im Bereich Explorer Ihr Projekt.
  7. Klicken Sie neben dem Dataset, das Sie löschen möchten, auf Aktionen ansehen und dann auf Öffnen.
  8. Klicken Sie im Detailbereich auf Dataset löschen und folgen Sie der Anleitung.
  9. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  10. Click the checkbox for the bucket that you want to delete.
  11. To delete the bucket, click Delete, and then follow the instructions.

Nächste Schritte