Daten mit Dataflow importieren, exportieren und ändern

Dataflow ist ein verwalteter Dienst für die Transformation und Anreicherung von Daten. Mit dem Dataflow-Connector für Spanner können Sie in einer Dataflow-Pipeline Daten in Spanner lesen und schreiben. Optional können Sie die Daten transformieren oder ändern. Sie können auch Pipelines erstellen, mit denen Daten zwischen Spanner und anderen Google Cloud-Produkten übertragen werden.

Der Dataflow-Connector ist die empfohlene Methode für die effiziente Bulk-Verschiebung von Daten in und aus Spanner sowie für die Durchführung großer Transformationen in eine Datenbank, die von partitionierter DML nicht unterstützt werden, z. B. Tabellenverschiebungen, Bulk-Löschvorgänge, die einen JOIN erfordern, usw. Bei der Arbeit mit einzelnen Datenbanken können Sie andere Methoden zum Importieren und Exportieren von Daten verwenden:

  • Verwenden Sie die Google Cloud Console, um eine einzelne Datenbank im Avro-Format von Spanner in Cloud Storage zu exportieren.
  • Mit der Google Cloud Console können Sie eine Datenbank aus Dateien, die Sie in Cloud Storage exportiert haben, wieder in Spanner import.
  • Verwenden Sie die REST API oder die Google Cloud CLI, um Export- oder import von Spanner nach Cloud Storage und zurück auszuführen (ebenfalls mit dem Avro-Format).

Der Dataflow-Connector für Spanner ist Teil des Apache Beam Java SDK und bietet eine API zum Ausführen der oben genannten Aktionen. Weitere Informationen zu einigen der unten beschriebenen Konzepte, wie z. B. PCollection-Objekten und -Transformationen, finden Sie im Programmierleitfaden für Apache Beam.

Connector dem Maven-Projekt hinzufügen

Wenn Sie den Cloud Dataflow-Connector einem Maven-Projekt hinzufügen möchten, dann fügen Sie das Maven-Artefakt beam-sdks-java-io-google-cloud-platform als Abhängigkeit in die Datei pom.xml ein:

Angenommen, Ihre Datei pom.xml setzt die Version von beam.version auf die entsprechende Versionsnummer. In diesem Fall fügen Sie die folgende Abhängigkeit hinzu:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Daten aus Spanner lesen

Wenden Sie die Transformation SpannerIO.read() an, um Daten aus Spanner zu lesen. Konfigurieren Sie den Lesevorgang mit den Methoden aus der Klasse SpannerIO.Read. Durch Einsatz der Transformation wird ein Objekt PCollection<Struct> zurückgegeben. Dabei stellt jedes Element in der Sammlung eine einzelne Zeile dar, die vom Lesevorgang zurückgegeben wird. Je nach gewünschter Ausgabe können Sie mit und ohne eine bestimmte SQL-Abfrage aus Spanner lesen.

Durch Einsatz der Transformation SpannerIO.read() wird mithilfe eines starken Lesevorgangs eine konsistente Datenansicht zurückgegeben. Sofern Sie dies nicht anders festlegen, beruht das Ergebnis des Lesevorgangs auf einem Snapshot der Daten zu dem Zeitpunkt, an dem Sie den Lesevorgang gestartet haben. Weitere Informationen zu den verschiedenen Arten von Lesevorgängen, die Spanner ausführen kann, finden Sie unter Lesevorgänge.

Daten mit einer Abfrage lesen

Wenn Sie einen bestimmten Datensatz aus Spanner lesen möchten, konfigurieren Sie die Transformation mit der Methode SpannerIO.Read.withQuery(), um eine SQL-Abfrage anzugeben. Beispiel:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

Daten ohne Angabe einer Abfrage lesen

Wenn Sie ohne eine Abfrage aus einer Datenbank lesen möchten, können Sie mit der Methode SpannerIO.Read.withTable() einen Tabellennamen und mit der Methode SpannerIO.Read.withColumns() eine Liste der zu lesenden Spalten angeben. Beispiel:

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

Wenn Sie die Anzahl der gelesenen Zeilen begrenzen möchten, können Sie mit der Methode SpannerIO.Read.withKeySet() eine Reihe von zu lesenden Primärschlüsseln angeben.

Sie können eine Tabelle auch mit einem angegebenen sekundären Index lesen. Wie beim readUsingIndex() API-Aufruf muss der Index alle Daten enthalten, die in den Abfrageergebnissen angezeigt werden.

Geben Sie dazu die Tabelle wie im vorherigen Beispiel gezeigt und den Index an, der die gewünschten Spaltenwerte enthält, indem Sie die Methode SpannerIO.Read.withIndex() verwenden. Im Index müssen alle Spalten gespeichert werden, die die Transformation lesen muss. Der Primärschlüssel der Basistabelle wird implizit gespeichert. Wenn Sie beispielsweise die Tabelle Songs mit dem Index SongsBySongName lesen möchten, verwenden Sie den folgenden Code:

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

Veraltete Transaktionsdaten kontrollieren

Eine Transformation wird immer auf einen konsistenten Daten-Snapshot angewendet. Verwenden Sie die Methode SpannerIO.Read.withTimestampBound(), um die Veralterung der Daten zu steuern. Weitere Informationen finden Sie unter Transaktionen.

Aus mehreren Tabellen in derselben Transaktion lesen

Wenn Sie Daten aus mehreren Tabellen zur gleichen Zeit lesen möchten, damit Sie so die Datenkonsistenz gewährleisten können, führen Sie alle Lesevorgänge mit einer einzigen Transaktion aus. Verwenden Sie dazu die Transformation createTransaction() und erstellen Sie ein Objekt PCollectionView<Transaction>, mit dem wiederum eine Transaktion erstellt wird. Die entstehende Ansicht kann mit SpannerIO.Read.withTransaction() an einen Lesevorgang übergeben werden.

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

Daten aus allen verfügbaren Tabellen lesen

Sie können Daten aus allen verfügbaren Tabellen in einer Spanner-Datenbank lesen.

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

Fehlerbehebung bei nicht unterstützten Abfragen

Der Dataflow-Connector unterstützt nur Spanner SQL-Abfragen, bei denen der erste Operator im Abfrageausführungsplan Distributed Union ist. Wenn Sie versuchen, Daten aus Spanner mit einer Abfrage zu lesen und eine Ausnahme mit dem Hinweis erhalten, dass die Abfrage does not have a DistributedUnion at the root lautet, führen Sie die Schritte unter Informationen zur Ausführung von Abfragen durch Spanner aus, um über die Google Cloud Console einen Ausführungsplan für die Abfrage abzurufen.

Wenn die SQL-Abfrage nicht unterstützt wird, vereinfachen Sie sie entsprechend, damit sie als ersten Operator im Abfrageausführungsplan den Operator "Distributed Union" enthält. Entfernen Sie Aggregatfunktionen, Tabellen-Joins sowie die Operatoren DISTINCT, GROUP BY und ORDER, da diese Operatoren am ehesten verhindern, dass die Abfrage funktioniert.

Mutationen für einen Schreibvorgang erstellen

Verwenden Sie die Methode newInsertOrUpdateBuilder() der Klasse Mutation anstelle der Methode newInsertBuilder(), sofern dies für Java-Pipelines nicht unbedingt erforderlich ist. Verwenden Sie für Python-Pipelines SpannerInsertOrUpdate() anstelle von SpannerInsert(). Dataflow bietet mindestens einmalige Garantien, was bedeutet, dass die Mutation mehrmals geschrieben werden kann. Daher können nur Mutationen vom Typ INSERT com.google.cloud.spanner.SpannerException: ALREADY_EXISTS-Fehler generieren, die zum Ausfall der Pipeline führen. Verwenden Sie stattdessen die Mutation INSERT_OR_UPDATE, um diesen Fehler zu vermeiden. Damit wird eine neue Zeile hinzugefügt oder Spaltenwerte aktualisiert, wenn die Zeile bereits vorhanden ist. Die Mutation INSERT_OR_UPDATE kann mehrmals angewendet werden.

In Spanner schreiben und Daten transformieren

Mit dem Dataflow-Connector können Sie Daten in Spanner schreiben. Dazu verwenden Sie eine SpannerIO.write()-Transformation, um eine Sammlung von Eingabezeilenmutationen auszuführen. Zur Steigerung der Effizienz gruppiert der Dataflow-Connector Mutationen in Batches.

Das folgende Beispiel zeigt, wie eine Schreibtransformation auf eine PCollection von Mutationen angewendet wird:

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

Wenn eine Transformation vor dem Abschluss unerwartet beendet wird, werden bereits angewendete Mutationen nicht zurückgesetzt.

Gruppen von Mutationen untrennbar ausführen

Sie können mit der Klasse MutationGroup eine Gruppe von Mutationen untrennbar zusammen ausführen. Mutationen in einer MutationGroup werden garantiert in der gleichen Transaktion übertragen, aber die Transaktion wird möglicherweise wiederholt.

Mutationsgruppen funktionieren am besten, wenn sie zur Zusammenfassung von Mutationen verwendet werden, die Auswirkungen auf im Schlüsselbereich nahe beieinander gespeicherte Daten haben. Da Spanner Daten aus übergeordneten und untergeordneten Tabellen in der übergeordneten Tabelle verschränkt, befinden sich diese Daten im Schlüsselbereich immer nahe beieinander. Wir empfehlen, die Mutationsgruppe entweder so zu strukturieren, dass sie eine auf eine übergeordnete Tabelle angewendete Mutation sowie zusätzliche, auf untergeordnete Tabellen angewendete Mutationen enthält oder dass mit sämtlichen Mutationen im Schlüsselbereich nahe beieinander liegende Daten geändert werden. Weitere Informationen dazu, wie Spanner Daten von über- und untergeordneten Tabellen speichert, finden Sie unter Schema und Datenmodell. Wenn Sie Ihre Mutationsgruppen nicht anhand der empfohlenen Tabellenhierarchien organisieren oder die Daten, auf die zugegriffen wird, im Schlüsselbereich nicht nah beieinander liegen, muss Spanner möglicherweise zweiphasige Commits ausführen, was zu einer geringeren Leistung führt. Weitere Informationen finden Sie unter Kompromisse bei der Lokalität.

Wenn Sie eine MutationGroup verwenden möchten, erstellen Sie eine SpannerIO.write()-Transformation und rufen Sie die Methode SpannerIO.Write.grouped() auf. Diese gibt eine Transformation zurück, die Sie dann auf eine PCollection von Objekten des Typs MutationGroup anwenden können.

Beim Erstellen einer MutationGroup wird die erste aufgelistete Mutation zur primären Mutation. Wenn Ihre Mutationsgruppe sowohl eine übergeordnete als auch eine untergeordnete Tabelle betrifft, sollte die primäre Mutation eine Mutation der übergeordneten Tabelle sein. Andernfalls können Sie jede Mutation als primäre Mutation verwenden. Der Cloud Dataflow-Connector verwendet die primäre Mutation, um Partitionsgrenzen zu bestimmen, damit Mutationen effizient zusammengefasst werden können.

Stellen Sie sich beispielsweise vor, dass Ihre Anwendung das Verhalten der Nutzer überwacht und problematische Fälle kennzeichnet, damit diese überprüft werden. Für jeden gekennzeichneten Fall des Nutzerverhaltens möchten Sie die Tabelle Users aktualisieren, damit der Nutzer nicht mehr auf Ihre Anwendung zugreifen kann. Außerdem wollen Sie den Vorfall in der Tabelle PendingReviews aufzeichnen. Verwenden Sie eine MutationGroup, um dafür zu sorgen, dass beide Tabellen untrennbar voneinander aktualisiert werden:

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

Beim Erstellen einer Mutationsgruppe wird die erste Mutation, die als Argument angegeben ist, zur primären Mutation. In diesem Fall haben die beiden Tabellen keinen Bezug zueinander, also gibt es keine eindeutige primäre Mutation. Wir haben uns für userMutation als primäre Mutation entschieden und haben sie an die erste Stelle gesetzt. Beide Mutationen separat auszuführen wäre zwar schneller, würde aber die Untrennbarkeit nicht garantieren, weshalb die Mutationsgruppe in dieser Situation die beste Wahl darstellt.

Nächste Schritte