Streamingdaten mit Dataflow SQL zusammenführen

In dieser Anleitung erfahren Sie, wie Sie mit Dataflow SQL einen Datenstrom aus Pub/Sub mit Daten aus einer BigQuery-Tabelle verknüpfen.

Ziele

In dieser Anleitung lernen Sie, wie Sie:

  • Eine Dataflow-SQL-Abfrage schreiben, die Pub/Sub-Streamingdaten mit BigQuery-Tabellendaten zusammenführt,.
  • Einen Dataflow-Job über die Dataflow-SQL-Benutzeroberfläche bereitstellen.

Kosten

In dieser Anleitung werden kostenpflichtige Komponenten von Google Cloud verwendet, darunter:

  • Dataflow
  • Cloud Storage
  • Pub/Sub

Sie können mithilfe des Preisrechners die Kosten für Ihre voraussichtliche Nutzung kalkulieren. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Hinweis

  1. Melden Sie sich bei Ihrem Google-Konto an.

    Wenn Sie noch kein Konto haben, melden Sie sich hier für ein neues Konto an.

  2. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  3. Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für Ihr Projekt aktiviert ist.

  4. Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub und Cloud Resource Manager APIs aktivieren.

    Aktivieren Sie die APIs

  5. Authentifizierung einrichten:
    1. Wechseln Sie in der Cloud Console zur Seite Dienstkontoschlüssel erstellen.

      Zur Seite „Dienstkontoschlüssel erstellen“
    2. Wählen Sie aus der Liste Dienstkonto die Option Neues Dienstkonto aus.
    3. Geben Sie im Feld Dienstkontoname einen Namen ein.
    4. Wählen Sie in der Liste Rolle die Option Projekt > Inhaber

    5. Klicken Sie auf Erstellen. Eine JSON-Datei mit Ihrem Schlüssel wird auf Ihren Computer heruntergeladen.
  6. Legen Sie für die Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS den Pfad der JSON-Datei fest, die Ihren Dienstkontoschlüssel enthält. Diese Variable gilt nur für Ihre aktuelle Shellsitzung. Wenn Sie eine neue Sitzung öffnen, müssen Sie die Variable noch einmal festlegen.

  7. Cloud SDK installieren und initialisieren Wählen Sie eine der Installationsoptionen aus. Möglicherweise müssen Sie das Attribut project auf das Projekt festlegen, das Sie für diese Schritt-für-Schritt-Anleitung verwenden.
  8. Öffnen Sie in der Cloud Console die BigQuery-Web-UI. Diese wird mit dem zuletzt aufgerufenen Projekt geöffnet. Klicken Sie am oberen Rand der BigQuery-Web-UI auf diesen Projektnamen, wenn Sie ein anderes Projekt verwenden möchten. Suchen Sie dann nach dem gewünschten Projekt.
    Zur BigQuery-Web-UI

Zur Dataflow SQL-UI wechseln

Führen Sie in der BigQuery-Web-UI folgende Schritte aus, um zur Dataflow-UI zu wechseln.

  1. Klicken Sie auf das Drop-down-Menü Mehr und wählen Sie Abfrageeinstellungen aus.

  2. Wählen Sie im rechts angezeigten Menü Abfrageeinstellungen die Option Dataflow-Engine aus.

  3. Wenn die Dataflow und Data Catalog APIs nicht für Ihr Projekt aktiviert sind, werden Sie zu deren Aktivierung aufgefordert. Klicken Sie auf APIs aktivieren. Die Aktivierung der Dataflow- und Data Catalog APIs kann einige Minuten dauern.

  4. Wenn die APIs aktiviert sind, klicken Sie auf Speichern.

Beispielquellen erstellen

Wenn Sie das Beispiel in dieser Anleitung durcharbeiten möchten, erstellen Sie für die Schritte der Anleitung folgende Quellen.

  • Ein Pub/Sub-Thema namens transactions: ein Transaktionsdatenstrom, der über ein Abo beim Pub/Sub-Thema eingeht. Die Daten für die einzelnen Transaktionen umfassen Informationen wie die gekauften Produkte, den Verkaufspreis und die Stadt und den Bundesstaat, in denen der Kauf erfolgt ist. Nachdem Sie das Pub- bzw. Sub-Thema erstellt haben, erstellen Sie ein Skript, das Nachrichten zu Ihrem Thema veröffentlicht. Dieses Skript führen Sie weiter unten in dieser Anleitung aus.
  • Eine BigQuery-Tabelle namens us_state_salesregions: eine Tabelle, die eine Zuordnung von Bundesstaaten zu Verkaufsregionen enthält. Vor dem Erstellen dieser Tabelle müssen Sie ein BigQuery-Dataset erstellen.

Pub/Sub-Quellen finden

Die SQL-UI von Dataflow bietet eine Möglichkeit, Pub/Sub-Datenquellenobjekte für jedes Projekt zu finden, auf das Sie Zugriff haben, sodass Sie sich deren vollständige Namen nicht merken müssen.

Fügen Sie für das Beispiel in dieser Anleitung das von Ihnen erstellte Thema transactions Pub/Sub hinzu:

  1. Klicken Sie im linken Navigationsbereich auf die Drop-down-Liste Daten hinzufügen und wählen Sie Cloud Dataflow-Quellen aus.

  2. Wählen Sie im rechts angezeigten Bereich Cloud Dataflow-Quelle hinzufügen die Option Pub/Sub-Themen aus. Geben Sie im Suchfeld transactions ein und starten Sie die Suche. Wählen Sie das Thema aus und klicken Sie auf Hinzufügen.

Pub/Sub-Thema ein Schema zuweisen

Durch das Zuweisen eines Schemas können Sie SQL-Abfragen für die Daten Ihres Pub/Sub-Themas ausführen. Derzeit erwartet Dataflow SQL, dass Nachrichten in Pub-/Sub-Themen im JSON-Format serialisiert werden. Andere Formate wie Avro werden in Zukunft ebenfalls unterstützt.

Nach dem Hinzufügen des Beispiel-Pub/Sub-Themas als eine Dataflow-Quelle führen Sie die folgenden Schritte aus, um dem Thema in der Dataflow-SQL-UI ein Schema zuzuweisen:

  1. Wählen Sie das Thema im Feld Ressourcen aus.

  2. Klicken Sie auf dem Tab Schema auf Schema bearbeiten. Die Seitenleiste Schema wird rechts geöffnet.

  3. Wechseln Sie zwischen den Optionen Als Text bearbeiten und fügen Sie das folgende Inline-Schema in den Editor ein. Klicken Sie dann auf Senden.

    [
      {
          "description": "Pub/Sub event timestamp",
          "name": "event_timestamp",
          "mode": "REQUIRED",
          "type": "TIMESTAMP"
      },
      {
          "description": "Transaction time string",
          "name": "tr_time_str",
          "type": "STRING"
      },
      {
          "description": "First name",
          "name": "first_name",
          "type": "STRING"
      },
      {
          "description": "Last name",
          "name": "last_name",
          "type": "STRING"
      },
      {
          "description": "City",
          "name": "city",
          "type": "STRING"
      },
      {
          "description": "State",
          "name": "state",
          "type": "STRING"
      },
      {
          "description": "Product",
          "name": "product",
          "type": "STRING"
      },
      {
          "description": "Amount of transaction",
          "name": "amount",
          "type": "FLOAT64"
      }
    ]
    
  4. (Optional) Klicken Sie auf Vorschau des Themas, um den Inhalt Ihrer Nachrichten zu prüfen und zu bestätigen, dass sie mit dem von Ihnen definierten Schema übereinstimmen.

Schema ansehen

  1. Klicken Sie im linken Navigationsbereich der Dataflow SQL-UI auf Cloud Dataflow-Quellen.
  2. Klicken Sie auf Pub/Sub-Themen.
  3. Klicken Sie auf transactions.
  4. Unter Schema können Sie das Schema anzeigen, das Sie dem Pub/Sub-Thema transactions zugewiesen haben.
Dem Thema zugewiesenes Schema, einschließlich einer Liste der Feldnamen und ihrer Beschreibungen.

SQL-Abfrage erstellen

In der Dataflow SQL-UI können Sie SQL-Abfragen erstellen, um Ihre Dataflow-Jobs auszuführen.

Die folgende SQL-Abfrage dient der Datenanreicherung. Mithilfe einer BigQuery-Tabelle (us_state_salesregions), die eine Zuordnung von Bundesstaaten zu Verkaufsregionen vornimmt, erweitert sie den Stream von Pub/Sub-Ereignissen (transactions) um das Feld sales_region.

Kopieren Sie die folgende SQL-Abfrage und fügen Sie sie in den Abfrageeditor ein. Ersetzen Sie project-id durch Ihre Projekt-ID.

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Wenn Sie in der Dataflow SQL-UI eine Abfrage eingeben, wird die Abfragesyntax vom Validator geprüft. Wenn die Abfrage gültig ist, wird ein grünes Häkchen angezeigt. Ist sie ungültig, ist ein rotes Ausrufezeichen zu sehen. Wenn die Abfragesyntax ungültig ist, erhalten Sie durch Klicken auf das Symbol des Validators Informationen darüber, was korrigiert werden muss.

Im folgenden Screenshot ist die gültige Abfrage im Abfrageeditor zu sehen. Das grüne Häkchen des Validators wird angezeigt.

Dataflow-Job zum Ausführen der SQL-Abfrage erstellen

Erstellen Sie zum Ausführen Ihrer SQL-Abfrage einen Dataflow-Job über die Dataflow-SQL-UI.

  1. Klicken Sie unter dem Abfrageeditor auf Dataflow-Job erstellen.

  2. Ändern Sie im rechts angezeigten Bereich Dataflow-Job erstellen den Standardwert von Tabellenname in dfsqltable_sales.

  3. (Optional) Dataflow wählt automatisch die Einstellungen aus, die für Ihren Dataflow-SQL-Job optimal sind. Sie können jedoch das Menü Optionale Parameter erweitern, um die folgenden Pipelineoptionen manuell festzulegen:

    • Maximale Anzahl der Worker
    • Zone
    • E-Mail-Adresse des Dienstkontos
    • Maschinentyp
    • Zusätzliche Tests
    • Konfiguration der Worker-IP-Adresse
    • Netzwerk
    • Subnetzwerk
  4. Klicken Sie auf Erstellen. Bis zum Beginn der Dataflow-Jobausführung vergehen einige Minuten.

  5. Der Bereich Abfrageergebnisse wird in der UI angezeigt. Wenn Sie später zum Bereich Abfrageergebnisse eines Jobs zurückkehren möchten, suchen Sie im Bereich Jobverlauf nach dem Job und klicken dann auf die Schaltfläche Abfrage im Editor öffnen, wie unter Dataflow-Job und -Ausgabe ansehen gezeigt.

  6. Klicken Sie unter Jobinformationen auf den Link mit der Job-ID. Daraufhin wird in der Dataflow-Web-UI ein neuer Browsertab mit der Dataflow-Seite Jobdetails geöffnet.

Dataflow-Job und -Ausgabe ansehen

Dataflow wandelt die SQL-Abfrage in eine Apache Beam-Pipeline um. In der Dataflow-Web-UI, die in einem neuen Browsertab geöffnet wurde, sehen Sie eine grafische Darstellung der Pipeline.

Pipeline aus SQL-Abfrage, die in der Dataflow-Web-UI angezeigt wird

Sie können auf die Felder klicken, um eine Aufschlüsselung der Transformationen aufzurufen, die in der Pipeline auftreten. Wenn Sie beispielsweise in der grafischen Darstellung auf das oberste Feld namens SQL-Abfrage ausführen klicken, wird eine Grafik mit den Vorgängen angezeigt, die im Hintergrund ablaufen.

Die oberen zwei Felder stellen die beiden Eingaben dar, die Sie verknüpft haben: das Pub/Sub-Thema transactions und die BigQuery-Tabelle us_state_salesregions.

Die Ausgabe eines Joins mit zwei Eingaben wird in 25 Sekunden abgeschlossen.

Wenn Sie die Ausgabetabelle mit den Jobergebnissen ansehen möchten, wechseln Sie zurück zum Browsertab mit der Dataflow SQL-UI. Klicken Sie dann im linken Navigationsbereich unter Ihrem Projekt auf das von Ihnen erstellte dataflow_sql_dataset-Dataset. Klicken Sie anschließend auf die Ausgabetabelle dfsqltable_sales. Auf dem Tab Vorschau wird der Inhalt der Ausgabetabelle angezeigt.

Die Vorschautabelle "dfsqltable_sales" enthält Spalten für "tr_time_str", "first_name", "last_name", Stadt, Land, Produkt, Menge und "sales_region".

Ältere Jobs ansehen und Abfragen bearbeiten

In der Cloud Dataflow SQL-UI werden ältere Jobs und Abfragen im Bereich Jobverlauf gespeichert. Jobs werden nach dem Tag aufgeführt, an dem sie gestartet wurden. In der Jobliste werden zuerst Tage mit laufenden Jobs angezeigt. Dann folgen Tage ohne laufende Jobs.

Sie können die Jobverlaufsliste verwenden, um vorherige SQL-Abfragen zu bearbeiten und neue Dataflow-Jobs auszuführen. Beispiel: Sie möchten die Abfrage ändern, mit der alle 15 Sekunden die Verkaufszahlen nach Verkaufsregion zusammengefasst werden. Greifen Sie in diesem Fall über den Bereich Job history (Jobverlauf) auf den laufenden Job zu, den Sie zuvor in dieser Anleitung gestartet haben. Ändern Sie dann die SQL-Abfrage und führen Sie einen weiteren Job mit der geänderten Abfrage aus.

  1. Klicken Sie im linken Navigationsbereich auf Job history (Jobverlauf).

  2. Klicken Sie unter Job history (Jobverlauf) auf Cloud Dataflow. Sie sehen alle älteren Jobs für Ihr Projekt.

    Jobverlauf mit Datum und Uhrzeit der Ausführung und Statussymbol für den Job.
  3. Klicken Sie auf den Job, den Sie bearbeiten möchten, und dann auf Abfrage im Editor öffnen.

  4. Bearbeiten Sie die SQL-Abfrage im Abfrageeditor, um rollierende Fenster hinzuzufügen. Ersetzen Sie project-id durch Ihre Projekt-ID, wenn Sie die folgende Abfrage kopieren.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  5. Klicken Sie unter dem Abfrageeditor auf Create Cloud Dataflow job (Cloud Dataflow-Job erstellen), um einen neuen Job mit der geänderten Abfrage zu erstellen.

Bereinigen

So vermeiden Sie, dass Ihrem Cloud-Rechnungskonto die in diesem Tutorial verwendeten Ressourcen in Rechnung gestellt werden:

  1. Beenden Sie das Veröffentlichungsskript transactions_injector.py, sofern es noch ausgeführt wird.

  2. Beenden Sie Ihre laufenden Dataflow-Jobs. Öffnen Sie in der Cloud Console die Dataflow-Web-UI.

    Zur Dataflow-Web-UI

    Führen Sie für jeden Job, den Sie in dieser Schritt-für-Schritt-Anleitung erstellt haben, folgende Schritte aus:

    1. Klicken Sie auf den Namen des Jobs.

    2. Klicken Sie im Bereich Jobzusammenfassung des Jobs auf Job beenden. Das Dialogfeld Job beenden mit den Optionen zum Beenden des Jobs wird angezeigt.

    3. Klicken Sie auf Abbrechen.

    4. Klicken Sie auf Job beenden. Der Dienst hält jegliche Datenaufnahme und -verarbeitung so schnell wie möglich an. Da durch Abbrechen die Verarbeitung sofort gestoppt wird, können alle In-Flight-Daten verloren gehen. Das Beenden eines Jobs kann einige Minuten dauern.

  3. Löschen Sie Ihr BigQuery-Dataset. Öffnen Sie in der Cloud Console die BigQuery-Web-UI.

    Zur BigQuery-Web-UI

    1. Klicken Sie im Navigationsbereich im Abschnitt Ressourcen auf das von Ihnen erstellte Dataset dataflow_sql_dataset.

    2. Klicken Sie rechts im Detailbereich auf Dataset löschen. Das Dataset, die Tabelle und alle Daten werden gelöscht.

    3. Bestätigen Sie im Dialogfeld Dataset löschen den Löschbefehl. Geben Sie dazu den Namen des Datasets (dataflow_sql_dataset) ein und klicken Sie auf Löschen.

  4. Löschen Sie Ihr Pub/Sub-Thema. Rufen Sie in der Cloud Console die Seite "Pub/Sub-Themen" auf.

    Zur Seite "Pub/Sub-Themen"

    1. Klicken Sie auf das Kästchen neben dem Thema transactions.

    2. Klicken Sie auf Löschen, um das Thema endgültig zu löschen.

    3. Rufen Sie die Seite Pub/Sub-Abonnements auf.

    4. Klicken Sie auf das Kästchen neben allen verbliebenen Abos von transactions. Falls Ihre Jobs nicht mehr ausgeführt werden, gibt es unter Umständen keine Abos mehr.

    5. Klicken Sie auf Löschen, um die Abos endgültig zu löschen.

  5. Löschen Sie den Dataflow-Staging-Bucket in Cloud Storage. Rufen Sie den Cloud Storage-Browser in der Cloud Console auf.

    Zum Cloud Storage-Browser

    1. Klicken Sie auf das Kästchen neben dem Dataflow-Staging-Bucket.

    2. Klicken Sie auf Löschen, um den Bucket endgültig zu löschen.

Nächste Schritte