Änderungen mit Dataflow streamen
Mit dem Bigtable Beam-Connector können Sie Bigtable-Datenänderungseinträge mit Dataflow lesen, ohne Partitionsänderungen in Ihrem Code erfassen oder verarbeiten zu müssen. Der Connector übernimmt diese Logik für Sie.
In diesem Dokument wird beschrieben, wie Sie den Bigtable Beam-Connector konfigurieren und verwenden, um einen Änderungsstream mit einer Dataflow-Pipeline zu lesen. Bevor Sie dieses Dokument lesen, sollten Sie den Artikel Überblick über die Änderungen Streams und machen Sie sich mit Dataflow vertraut.
Alternativen zum Erstellen einer eigenen Pipeline
Wenn Sie keine eigene Dataflow-Pipeline erstellen möchten, haben Sie folgende Möglichkeiten:
Sie können eine von Google bereitgestellte Dataflow-Vorlage verwenden.
Sie können auch die Codebeispiele aus der Bigtable-Anleitung verwenden oder Schnellstart als Ausgangspunkt für Ihren Code.
Achten Sie darauf, dass für den generierten Code die Version 26.14.0 oder höher von google cloud libraries-bom
verwendet wird.
Connector-Details
Mit der Bigtable-Beam-Connector-Methode BigtableIO.readChangeStream
können Sie einen Datenstrom lesen
Änderungseinträge (ChangeStreamMutation
), die Sie verarbeiten können. Der Bigtable Beam-Connector ist eine Komponente des GitHub-Repositorys für Apache Beam. Eine Beschreibung des Connector-Codes finden Sie in den Kommentaren unter
BigtableIO.java
Sie müssen den Connector mit Beam-Version 2.48.0 oder höher verwenden. Prüfen Sie Apache Beam Laufzeitunterstützung, um sicherzustellen, Sie eine unterstützte Version von Java verwenden. Anschließend können Sie eine Pipeline bereitstellen, die den Connector für Dataflow verwendet. Dieser übernimmt die Bereitstellung und Verwaltung von Ressourcen und unterstützt die Skalierbarkeit und Zuverlässigkeit der Verarbeitung von Streaming-Daten.
Weitere Informationen zum Apache Beam-Programmiermodell finden Sie in der Dokumentation zu Beam
Daten ohne Ereigniszeiten gruppieren
Datenänderungseinträge, die mit dem Bigtable Beam-Connector gestreamt werden, sind nicht mit Dataflow-Funktionen kompatibel, die von Ereigniszeiten abhängen.
Wie unter Replikation und Markierungen erläutert, wird ein niedriger Wasserzeichenwert möglicherweise nicht erhöht, wenn die Replikation für die Partition noch nicht auf dem Stand des Rests der Instanz ist. Wenn ein niedriges Wasserzeichen nicht mehr weiterbewegt, wird der Änderungsstream angehalten.
Um ein Stottern des Streams zu verhindern, gibt der Bigtable Beam-Connector alle Daten mit einem Ausgabezeitstempel von null aus. Durch den Zeitstempel null berücksichtigt Dataflow alle Daten, Datensätze ändern in verspätete Daten. Daher sind Dataflow-Funktionen, die von Ereigniszeiten abhängen, nicht mit Bigtable-Änderungsstreams kompatibel. Insbesondere können Sie nicht nutzen Fensterfunktionen Ereigniszeit-Trigger oder Ereigniszeit-Timer
Stattdessen können Sie GlobalWindows mit Triggern verwenden, die nicht auf die Ereigniszeit zurückzuführen sind, um diese verspäteten Daten in Ansichten zu gruppieren, wie im Beispiel aus der Anleitung gezeigt. Details zu Triggern und Bereichen finden Sie unter Trigger im Beam-Programmierhandbuch.
Autoscaling
Der Connector unterstützt
Dataflow-Autoscaling
der standardmäßig aktiviert ist, wenn Sie
Runner v2
(erforderlich). Der Dataflow-Autoscaling-Algorithmus berücksichtigt
den geschätzten Rückstand beim Änderungsstream, der in der
Dataflow-Monitoring
im Abschnitt Backlog
. Verwenden Sie das Flag --maxNumWorkers
beim Bereitstellen einer
die Anzahl der Worker begrenzen.
Informationen zum manuellen Skalieren der Pipeline anstelle des Autoscalings finden Sie unter Streamingpipeline manuell skalieren
Beschränkungen
Beachten Sie die folgenden Einschränkungen, bevor Sie den Bigtable Beam-Connector mit Dataflow verwenden.
Dataflow Runner v2
Der Connector kann nur mit Dataflow Runner v2 ausgeführt werden.
Geben Sie dazu --experiments=use_runner_v2
in Ihren Befehlszeilenargumenten an. Wenn Sie die Pipeline mit Runner v1 ausführen, schlägt sie mit der folgenden Ausnahme fehl:
java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow
Snapshots
Der Connector unterstützt keine Dataflow-Snapshots.
Duplikate
Der Bigtable Beam-Connector streamt Änderungen für jeden Zeilenschlüssel und jeden dass der Cluster in der Commit-Zeitstempelreihenfolge neu startet, früheren Zeitpunkten im Stream, kann es zu Duplikaten kommen.
Hinweis
Bevor Sie den Connector verwenden können, müssen Sie die folgenden Voraussetzungen erfüllen.
Authentifizierung einrichten
Wenn Sie die Java Beispiele auf dieser Seite in einer lokalen Entwicklungsumgebung verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
Weitere Informationen unter Set up authentication for a local development environment.
Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Set up Application Default Credentials for code running on Google Cloud.
Änderungsstream aktivieren
Sie müssen einen Änderungsstream für eine Tabelle aktivieren, bevor Sie sie lesen können. Sie können auch eine neue Tabelle erstellen, für die Änderungsstreams aktiviert sind.
Tabelle mit Stream-Metadaten ändern
Wenn Sie Änderungen mit Dataflow streamen, erstellt der Bigtable Beam-Connector standardmäßig eine Metadatentabelle mit dem Namen __change_stream_md_table
. Die Metadatentabelle des Änderungsstreams verwaltet
den Betriebsstatus des Connectors und speichert Metadaten zu Datenänderungen
Datensätze.
Standardmäßig erstellt der Connector die Tabelle in derselben Instanz wie die gestreamte Tabelle. Damit die Tabelle richtig funktioniert, muss das Anwendungsprofil für die Metadatentabelle Single-Cluster-Routing verwenden und Transaktionen für einzelne Zeilen aktiviert haben.
Weitere Informationen zum Streamen von Änderungen aus Bigtable mit dem Bigtable Beam-Connector finden Sie in der BigtableIO-Dokumentation.
Erforderliche Rollen
Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Lesen eines Bigtable-Änderungsstreams mit Dataflow benötigen.
Um die Änderungen aus Bigtable zu lesen, benötigen Sie diese Rolle:
- Bigtable-Administrator (roles/bigtable.admin) für die Bigtable-Instanz, die die Tabelle enthält, aus der Sie Änderungen streamen möchten
Zum Ausführen des Dataflow-Jobs benötigen Sie folgende Rollen:
- Die Rolle „Dataflow-Entwickler“ (
roles/dataflow.developer
) für das Projekt mit Ihren Cloud-Ressourcen - Dataflow-Worker (roles/dataflow.worker) für das Projekt mit Ihren Cloud-Ressourcen
- Storage-Objekt-Administrator (roles/storage.objectAdmin) für die Cloud Storage-Buckets, die Sie verwenden möchten
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff verwalten.
Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.
Bigtable Beam-Connector als Abhängigkeit hinzufügen
Fügen Sie Ihrer Maven-pom.xml-Datei Code ähnlich der folgenden Abhängigkeit hinzu. Die Version 2.48.0 oder höher sein.
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
Änderungsstream lesen
Um eine Dataflow-Pipeline zum Lesen Ihrer Änderungsdatensätze zu erstellen,
konfigurieren Sie den Connector
und fügen dann Transformationen und Senken hinzu. Dann verwenden Sie den
Connector zum Lesen von ChangeStreamMutation
-Objekten in einer Beam-Pipeline.
Die in Java geschriebenen Codebeispiele in diesem Abschnitt zeigen, wie Sie eine Pipeline erstellen und damit Schlüssel/Wert-Paare in einen String konvertieren. Jedes Paar besteht aus
eines Zeilenschlüssels und eines ChangeStreamMutation
-Objekts. Die Pipeline konvertiert jeden
in einen durch Kommas getrennten String umwandeln.
Pipeline erstellen
Dieses Java-Codebeispiel zeigt, wie die Pipeline erstellt wird:
Datensätze zu Datenänderungen verarbeiten
Dieses Beispiel zeigt, wie alle Einträge in einem Datensatz für Datenänderungen durchlaufen werden können. für eine Zeile und rufen Sie eine Methode zum Umwandeln in einen String basierend auf dem Eintragstyp auf.
Eine Liste der Eintragstypen, die ein Datenänderungseintrag enthalten kann, finden Sie unter Inhalt eines Datenänderungseintrags.
In diesem Beispiel wird ein write-Eintrag konvertiert:
In dieser Stichprobe wird ein Eintrag für das Löschen von Zellen konvertiert:
In diesem Beispiel wird ein Löschen eines Spaltenfamilieneintrags konvertiert:
Überwachen
Mit den folgenden Ressourcen in der Google Cloud Console können Sie Ihre Google Cloud-Ressourcen während der Ausführung einer Dataflow-Pipeline, Bigtable-Änderungsstreams lesen:
Überprüfen Sie insbesondere die folgenden Messwerte:
- Prüfen Sie auf der Seite Bigtable-Monitoring die folgenden Messwerte:
- Daten zur CPU-Auslastung nach Änderungsstreams im Messwert
cpu_load_by_app_profile_by_method_by_table
Zeigt den Änderungsstream auf die CPU-Nutzung des Clusters auswirken. - Änderungsstream-Speicherauslastung (Byte)
(
change_stream_log_used_bytes
).
- Daten zur CPU-Auslastung nach Änderungsstreams im Messwert
- Prüfen Sie auf der Dataflow-Monitoringseite die Daten Aktualität: Hier sehen Sie den Unterschied zwischen der aktuellen Uhrzeit und der Wasserzeichen. Sie sollte etwa zwei Minuten dauern. Gelegentlich kann es zu Spitzen kommen, ein oder zwei Minuten länger. Wenn der Messwert für die Datenaktualität kontinuierlich höher ist, als dieser Grenzwert erreicht ist, verfügt Ihre Pipeline wahrscheinlich über unzureichende Ressourcen weitere Dataflow-Worker hinzufügen sollten. Die Datenaktualität um anzuzeigen, ob Datenänderungsdatensätze langsam verarbeitet werden.
- Die Dataflow-
processing_delay_from_commit_timestamp_MEAN
die durchschnittliche Verarbeitungszeit von Datenänderungsdatensätzen im Lebensdauer des Jobs.
Der Bigtable-Messwert server/latencies
ist nicht hilfreich, wenn Sie
Monitoring einer Dataflow-Pipeline, die eine
Bigtable-Änderungsstream, da er die Streaminganfrage widerspiegelt
und nicht die Verarbeitungslatenz
des Data-Change-Records. Eine hohe Latenz in einem Änderungsstream bedeutet nicht, dass die Anfragen langsam verarbeitet werden, sondern dass die Verbindung so lange offen war.
Nächste Schritte
- Daten aus Dataflow in Cloud Storage schreiben
- Vollständige Liste der von Bigtable bereitgestellten Monitoring-Messwerte