Dataflow ist ein verwalteter Dienst für die Transformation und Anreicherung von Daten. Mit dem Dataflow-Connector für Spanner können Sie Daten lesen Daten in Spanner über eine Dataflow-Pipeline schreiben wahlweise transformieren oder ändern. Sie können auch Pipelines erstellen, mit denen Daten zwischen Spanner und anderen Google Cloud-Produkten übertragen werden.
Der Dataflow-Connector ist die empfohlene Methode zur effizienten Bulk-Verschiebung von Daten in und aus Spanner und zum Ausführen großer Transformationen an einer Datenbank, die von partitionierter DML nicht unterstützt werden, z. B. Tabellenverschiebungen und Bulk-Löschungen, die einen JOIN erfordern. Bei der Arbeit einzelnen Datenbanken gibt es andere Methoden, die Sie verwenden können, Daten exportieren:
- Verwenden Sie die Google Cloud Console, um eine einzelne Datenbank aus einer Datenbank zu exportieren. Spanner für Cloud Storage im Avro.
- Wenn Sie eine Datenbank aus Dateien, die Sie nach Cloud Storage exportiert haben, zurück in Spanner importieren möchten, verwenden Sie dafür ebenfalls am besten die Google Cloud Console.
- Verwenden Sie die REST API oder die Google Cloud CLI, um export oder import-Jobs von Spanner in Cloud Storage und umgekehrt mit dem Avro-Format).
Der Dataflow-Connector für Spanner ist Teil des Apache Beam Java SDK, das eine API zum Ausführen der oben genannten Aktionen. Weitere Informationen zu einigen der unten behandelten Konzepte, wie z. B. finden Sie im Programmierleitfaden für Apache Beam.
Connector dem Maven-Projekt hinzufügen
Wenn Sie den Cloud Dataflow-Connector einem Maven-Projekt hinzufügen möchten, dann fügen Sie das Maven-Artefakt beam-sdks-java-io-google-cloud-platform
als Abhängigkeit in die Datei pom.xml
ein:
Angenommen, Ihre Datei pom.xml
setzt die Version von beam.version
auf die entsprechende Versionsnummer. In diesem Fall fügen Sie die folgende Abhängigkeit hinzu:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
Daten aus Spanner lesen
Wenden Sie die Transformation SpannerIO.read() an, um Daten aus Spanner zu lesen.
Konfigurieren Sie den Lesevorgang mit den Methoden aus der Klasse SpannerIO.Read
.
Durch Einsatz der Transformation wird ein Objekt PCollection<Struct>
zurückgegeben. Dabei stellt jedes Element in der Sammlung eine einzelne Zeile dar, die vom Lesevorgang zurückgegeben wird. Sie können mit und ohne bestimmten SQL-Code aus Spanner lesen
abhängig von der gewünschten Ausgabe.
Durch Einsatz der Transformation SpannerIO.read()
wird mithilfe eines starken Lesevorgangs eine konsistente Datenansicht zurückgegeben. Sofern Sie dies nicht anders festlegen, beruht das Ergebnis des Lesevorgangs auf einem Snapshot der Daten zu dem Zeitpunkt, an dem Sie den Lesevorgang gestartet haben. Weitere Informationen finden Sie unter Lesevorgänge.
Informationen zu den verschiedenen Arten von Lesevorgängen, die Spanner ausführen kann.
Daten mit einer Abfrage lesen
Konfigurieren Sie die Transformation, um einen bestimmten Datensatz aus Spanner zu lesen
Verwenden Sie die Methode SpannerIO.Read.withQuery()
, um einen SQL-Code anzugeben.
Abfrage. Beispiel:
Daten ohne Angabe einer Abfrage lesen
Wenn Sie ohne eine Abfrage aus einer Datenbank lesen möchten, können Sie eine Tabelle angeben SpannerIO.Read.withTable() verwenden und ein Liste der Spalten, die mit SpannerIO.Read.withColumns() gelesen werden sollen . Beispiel:
GoogleSQL
PostgreSQL
Um die Anzahl der gelesenen Zeilen zu begrenzen, können Sie mit der Methode SpannerIO.Read.withKeySet().
Sie können eine Tabelle auch mit einem angegebenen sekundären Index lesen. Wie bei der readUsingIndex() API aufrufen, muss der Index alle Daten enthalten, die in den Abfrageergebnissen angezeigt wird.
Geben Sie dazu die Tabelle wie im vorherigen Beispiel gezeigt an und legen Sie den Parameter
Index, der die gewünschten Spaltenwerte enthält.
SpannerIO.Read.withIndex()
-Methode. Im Index müssen alle
Spalten, die die Transformation lesen muss. Der Primärschlüssel der Basistabelle ist
implizit gespeichert wurden. Um beispielsweise die Tabelle Songs
mit dem Index zu lesen,
SongsBySongName
, Sie verwenden das
folgenden Code:
GoogleSQL
PostgreSQL
Veraltete Transaktionsdaten kontrollieren
Eine Transformation wird immer auf einen konsistenten Daten-Snapshot angewendet. Verwenden Sie die Methode SpannerIO.Read.withTimestampBound()
, um die Veralterung der Daten zu steuern. Weitere Informationen finden Sie unter Transaktionen.
Aus mehreren Tabellen in derselben Transaktion lesen
Wenn Sie Daten aus mehreren Tabellen zur gleichen Zeit lesen möchten, damit Sie so die Datenkonsistenz gewährleisten können, führen Sie alle Lesevorgänge mit einer einzigen Transaktion aus. Verwenden Sie dazu die Transformation createTransaction()
und erstellen Sie ein Objekt PCollectionView<Transaction>
, mit dem wiederum eine Transaktion erstellt wird. Die entstehende Ansicht kann mit SpannerIO.Read.withTransaction()
an einen Lesevorgang übergeben werden.
GoogleSQL
PostgreSQL
Daten aus allen verfügbaren Tabellen lesen
Sie können Daten aus allen verfügbaren Tabellen in einer Spanner-Datenbank lesen.
GoogleSQL
PostgreSQL
Fehlerbehebung bei nicht unterstützten Abfragen
Der Dataflow-Connector unterstützt nur Spanner SQL-Abfragen
wobei der erste Operator im Abfrageausführungsplan ein Verteilte
Union Wenn Sie versuchen,
Daten aus Spanner mit einer Abfrage
erhalten Sie eine Ausnahme, die besagt, dass die Abfrage does not have a DistributedUnion at
the root
die Schritte unter Informationen zur Ausführung von Spanner
Abfragen, um einen Ausführungsplan für Ihre Abfrage mithilfe der Methode
Google Cloud Console
Wenn die SQL-Abfrage nicht unterstützt wird, vereinfachen Sie sie entsprechend, damit sie als ersten Operator im Abfrageausführungsplan den Operator "Distributed Union" enthält. Entfernen Sie Aggregatfunktionen, Tabellen-Joins sowie die Operatoren DISTINCT
, GROUP BY
und ORDER
, da diese Operatoren am ehesten verhindern, dass die Abfrage funktioniert.
Mutationen für einen Schreibvorgang erstellen
Verwenden Sie die Klasse Mutation
newInsertOrUpdateBuilder()
anstelle der
newInsertBuilder()
-Methode
es sei denn, es ist
für Java-Pipelines unbedingt erforderlich. Verwenden Sie für Python-Pipelines
SpannerInsertOrUpdate()
statt
SpannerInsert()
Dataflow bietet
mindestens einmal garantiert, was bedeutet, dass die Mutation
mehrere Male. Daher können nur INSERT
-Mutationen generieren
com.google.cloud.spanner.SpannerException: ALREADY_EXISTS
Fehler, die dazu führen,
damit die Pipeline fehlschlägt. Verwenden Sie INSERT_OR_UPDATE
, um diesen Fehler zu vermeiden.
Mutation erstellt, bei der eine neue Zeile hinzugefügt oder Spaltenwerte aktualisiert werden,
existiert bereits. Die Mutation INSERT_OR_UPDATE
kann mehrmals angewendet werden.
In Spanner schreiben und Daten transformieren
Mit dem Dataflow können Sie Daten in Spanner schreiben
Connector mithilfe einer SpannerIO.write()
-Transformation, um eine
Sammlung von Eingabezeilenmutationen. Dataflow-Connector-Gruppen
Mutationen in Batches zusammenfassen.
Das folgende Beispiel zeigt, wie eine Schreibtransformation auf eine PCollection
von Mutationen angewendet wird:
GoogleSQL
PostgreSQL
Wenn eine Transformation vor dem Abschluss unerwartet stoppt, werden Mutationen, die bereits angewendet wurde, wird kein Rollback durchgeführt.
Gruppen von Mutationen untrennbar ausführen
Sie können mit der Klasse MutationGroup
eine Gruppe von Mutationen untrennbar zusammen ausführen. Mutationen in einer MutationGroup
werden garantiert in der gleichen Transaktion übertragen, aber die Transaktion wird möglicherweise wiederholt.
Mutationsgruppen funktionieren am besten, wenn sie zur Zusammenfassung von Mutationen verwendet werden, die Auswirkungen auf im Schlüsselbereich nahe beieinander gespeicherte Daten haben. Weil Spanner verschränkt Daten der übergeordneten und untergeordneten Tabelle in der übergeordneten Tabelle, sodass diese Daten im Schlüsselbereich immer dicht beieinander liegen. Wir empfehlen, die Mutationsgruppe entweder so zu strukturieren, dass sie eine auf eine übergeordnete Tabelle angewendete Mutation sowie zusätzliche, auf untergeordnete Tabellen angewendete Mutationen enthält oder dass mit sämtlichen Mutationen im Schlüsselbereich nahe beieinander liegende Daten geändert werden. Weitere Informationen dazu, wie Spanner übergeordnete und Untergeordnete Tabellendaten finden Sie unter Schema und Datenmodell. Wenn Sie keine Mutationsgruppen an die empfohlenen Tabellenhierarchien anpassen nicht nah beieinander im Schlüsselbereich ist, Zwei-Phasen-Commits durchführen müssen, was die Leistung verlangsamt. Weitere Informationen finden Sie unter Kompromisse bei der Lokalität.
Wenn Sie eine MutationGroup
verwenden möchten, erstellen Sie eine SpannerIO.write()
-Transformation und rufen Sie die Methode SpannerIO.Write.grouped()
auf. Diese gibt eine Transformation zurück, die Sie dann auf eine PCollection
von Objekten des Typs MutationGroup
anwenden können.
Beim Erstellen einer MutationGroup
wird die erste aufgelistete Mutation zur primären Mutation. Wenn Ihre Mutationsgruppe sowohl eine übergeordnete als auch eine untergeordnete Tabelle betrifft, sollte die primäre Mutation eine Mutation der übergeordneten Tabelle sein. Andernfalls können Sie jede Mutation als primäre Mutation verwenden. Der Cloud Dataflow-Connector verwendet die primäre Mutation, um Partitionsgrenzen zu bestimmen, damit Mutationen effizient zusammengefasst werden können.
Stellen Sie sich beispielsweise vor, dass Ihre Anwendung das Verhalten der Nutzer überwacht und problematische Fälle kennzeichnet, damit diese überprüft werden. Für jeden gekennzeichneten Fall des Nutzerverhaltens möchten Sie die Tabelle Users
aktualisieren, damit der Nutzer nicht mehr auf Ihre Anwendung zugreifen kann. Außerdem wollen Sie den Vorfall in der Tabelle PendingReviews
aufzeichnen. Verwenden Sie eine MutationGroup
, um dafür zu sorgen, dass beide Tabellen untrennbar voneinander aktualisiert werden:
GoogleSQL
PostgreSQL
Beim Erstellen einer Mutationsgruppe wird die erste Mutation, die als Argument angegeben ist, zur primären Mutation. In diesem Fall haben die beiden Tabellen keinen Bezug zueinander, also gibt es keine eindeutige primäre Mutation. Wir haben uns für userMutation
als primäre Mutation entschieden und haben sie an die erste Stelle gesetzt. Beide Mutationen separat auszuführen wäre zwar schneller, würde aber die Untrennbarkeit nicht garantieren, weshalb die Mutationsgruppe in dieser Situation die beste Wahl darstellt.
Weitere Informationen
- Apache Beam-Datenpipeline entwerfen
- Spanner-Datenbanken mit Dataflow exportieren und importieren