Diese Seite wurde von der Cloud Translation API übersetzt.
Switch to English

Dataflow-Connector verwenden

Dataflow ist ein verwalteter Dienst für die Transformation und Anreicherung von Daten. Mit dem Dataflow-Connector für Cloud Spanner können Sie über eine Dataflow-Pipeline Daten in Cloud Spanner lesen und schreiben. Optional können Sie die Daten dabei transformieren oder ändern. Sie können auch Pipelines erstellen, mit denen Daten zwischen Cloud Spanner und anderen Google Cloud Platform-Produkten übertragen werden.

Der Dataflow-Connector ist die empfohlene Methode zur effizienten Bulk-Verschiebung von Daten in und aus Cloud Spanner. Wenn Sie mit einzelnen Datenbanken arbeiten, können Sie die Daten mit anderen Methoden im- und exportieren:

  • Wenn Sie eine einzelne Datenbank im Avro-Format aus Cloud Spanner nach Cloud Storage exportieren möchten, verwenden Sie dafür am besten die Cloud Console.
  • Wenn Sie eine Datenbank aus Dateien, die Sie nach Cloud Storage exportiert haben, zurück in Cloud Spanner importieren möchten, verwenden Sie dafür ebenfalls am besten die Cloud Console.
  • Falls Sie Jobs aus Cloud Spanner nach Cloud Storage und zurück exportieren oder importieren möchten, dann verwenden Sie dafür am besten entweder die REST API oder das gcloud-Befehlszeilentool.

Der Dataflow-Connector für Cloud Spanner ist Teil des Apache Beam Java SDK und bietet eine API für die oben genannten Vorgänge. Im Apache Beam-Leitfaden finden Sie weitere Informationen zu einigen der unten erläuterten Konzepte wie PCollection-Objekte und -Transformationen.

Connector dem Maven-Projekt hinzufügen

Wenn Sie den Cloud Dataflow-Connector einem Maven-Projekt hinzufügen möchten, dann fügen Sie das Maven-Artefakt beam-sdks-java-io-google-cloud-platform als Abhängigkeit in die Datei pom.xml ein:

Angenommen, Ihre Datei pom.xml setzt die Version von beam.version auf die entsprechende Versionsnummer. In diesem Fall fügen Sie die folgende Abhängigkeit hinzu:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Daten aus Cloud Spanner lesen

Verwenden Sie die Transformation SpannerIO.read(), um Daten aus Cloud Spanner zu lesen. Konfigurieren Sie den Lesevorgang mit den Methoden aus der Klasse SpannerIO.Read. Durch Einsatz der Transformation wird ein Objekt PCollection<Struct> zurückgegeben. Dabei stellt jedes Element in der Sammlung eine einzelne Zeile dar, die vom Lesevorgang zurückgegeben wird. Je nach gewünschter Ausgabe können Sie Daten mit einer bestimmten SQL-Abfrage oder ohne SQL-Abfrage aus Cloud Spanner lesen.

Durch Einsatz der Transformation SpannerIO.read() wird mithilfe eines starken Lesevorgangs eine konsistente Datenansicht zurückgegeben. Sofern Sie dies nicht anders festlegen, beruht das Ergebnis des Lesevorgangs auf einem Snapshot der Daten zu dem Zeitpunkt, an dem Sie den Lesevorgang gestartet haben. Weitere Informationen zu den verschiedenen Arten von Lesevorgängen, die mit Cloud Spanner möglich sind, finden Sie unter Lesevorgänge.

Lesen mithilfe einer Abfrage

Wenn Sie bestimmte Daten aus Cloud Spanner lesen möchten, konfigurieren Sie die Transformation mit der Methode SpannerIO.Read.withQuery(), um eine SQL-Abfrage anzugeben. Beispiel:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = p.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

Lesen ohne Festlegen einer Abfrage

Zum Lesen aus einer Datenbank ohne eine Abfrage können Sie einen Tabellennamen und eine Liste von Spalten angeben oder einen Lesevorgang mit einem Index ausführen. Geben Sie beim Erstellen der Transformation mit SpannerIO.read() einen Tabellennamen und eine Liste von Spalten an, damit Sie aus ausgewählten Spalten lesen können. Beispiel:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = p.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

Sie können auch aus der Tabelle lesen, indem Sie einen bestimmten Satz Schlüssel als Indexwerte verwenden. Erstellen Sie den Lesevorgang dazu mit der Methode SpannerIO.Read.withIndex() und verwenden Sie einen Index mit den gewünschten Schlüsselwerten.

Steuerung der Veralterung von Transaktionsdaten

Eine Transformation wird immer auf einen konsistenten Daten-Snapshot angewendet. Verwenden Sie die Methode SpannerIO.Read.withTimestampBound(), um die Veralterung der Daten zu steuern. Weitere Informationen finden Sie unter Transaktionen.

Lesen aus mehreren Tabellen mit der gleichen Transaktion

Wenn Sie Daten aus mehreren Tabellen zur gleichen Zeit lesen möchten, damit Sie so die Datenkonsistenz gewährleisten können, führen Sie alle Lesevorgänge mit einer einzigen Transaktion aus. Verwenden Sie dazu die Transformation createTransaction() und erstellen Sie ein Objekt PCollectionView<Transaction>, mit dem wiederum eine Transaktion erstellt wird. Die entstehende Ansicht kann mit SpannerIO.Read.withTransaction() an einen Lesevorgang übergeben werden.

SpannerConfig spannerConfig = SpannerConfig.create()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId);
PCollectionView<Transaction> tx = p.apply(
    SpannerIO.createTransaction()
        .withSpannerConfig(spannerConfig)
        .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers = p.apply(SpannerIO.read()
    .withSpannerConfig(spannerConfig)
    .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
    .withTransaction(tx));
PCollection<Struct> albums = p.apply(SpannerIO.read().withSpannerConfig(spannerConfig)
    .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
    .withTransaction(tx));

Daten aus allen verfügbaren Tabellen lesen

Sie können die Daten aus allen verfügbaren Tabellen in einer Cloud Spanner-Datenbank lesen:

PCollection<Struct> allRecords = p.apply(SpannerIO.read()
    .withSpannerConfig(spannerConfig)
    .withBatching(false)
    .withQuery("SELECT t.table_name FROM information_schema.tables AS t WHERE t"
        + ".table_catalog = '' AND t.table_schema = ''")).apply(
    MapElements.into(TypeDescriptor.of(ReadOperation.class))
        .via((SerializableFunction<Struct, ReadOperation>) input -> {
          String tableName = input.getString(0);
          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
        })).apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

Fehlerbehebung bei nicht unterstützten Abfragen

Der Dataflow-Connector unterstützt nur SQL-Abfragen an Cloud Spanner, bei denen der erste Operator im Abfrageausführungsplan Distributed Union ist. Wenn Sie versuchen, Daten aus Cloud Spanner mit einer Abfrage zu lesen und die Ausnahmemeldung does not have a DistributedUnion at the root erhalten, führen Sie die unter So werden Abfragen von Cloud Spanner ausgeführt angegebenen Schritte aus, damit Sie mit der Cloud Console einen Ausführungsplan für die Abfrage abrufen können.

Wenn die SQL-Abfrage nicht unterstützt wird, vereinfachen Sie sie entsprechend, damit sie als ersten Operator im Abfrageausführungsplan den Operator "Distributed Union" enthält. Entfernen Sie Aggregatfunktionen, Tabellenverknüpfungen sowie die Operatoren DISTINCT, GROUP BY und ORDER, da diese Operatoren am ehesten verhindern, dass die Abfrage verhindert wird. arbeiten.

Mutationen für einen Schreibvorgang erstellen

Verwenden Sie die Methode newInsertOrUpdateBuilder() der Klasse Mutation anstelle der Methode newInsertBuilder(), sofern diese nicht unbedingt erforderlich ist. Dataflow garantiert die mindestens einmalige Ausführung. Dies bedeutet, dass die Mutation wahrscheinlich mehrmals geschrieben wird. Daher führen Insert-Mutationen wahrscheinlich zu Fehlern, die einen Ausfall der Pipeline verursachen. Erstellen Sie Insert- oder Update-Mutationen, die mehrfach ausgeführt werden können, um diese Fehler zu vermeiden.

Schreiben in Cloud Spanner und Transformieren von Daten

Mit dem Dataflow-Connector können Sie Daten in Cloud Spanner schreiben. Verwenden Sie hierfür die Transformation SpannerIO.write(), um eine Sammlung von Eingabezeilenmutationen auszuführen. Für eine höhere Effizienz gruppiert der Dataflow-Connector Mutationen in Batches.

Das folgende Beispiel zeigt, wie eine Schreibtransformation auf eine PCollection von Mutationen angewendet wird:

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

Wenn eine Transformation vor dem Abschluss unerwartet abgebrochen wird, werden die bereits ausgeführten Mutationen nicht zurückgesetzt.

Gruppen von Mutationen untrennbar ausführen

Sie können mit der Klasse MutationGroup eine Gruppe von Mutationen untrennbar zusammen ausführen. Mutationen in einer MutationGroup werden garantiert in der gleichen Transaktion übertragen, aber die Transaktion wird möglicherweise wiederholt.

Mutationsgruppen funktionieren am besten, wenn sie zur Zusammenfassung von Mutationen verwendet werden, die Auswirkungen auf im Schlüsselbereich nahe beieinander gespeicherte Daten haben. Da Cloud Spanner Daten aus übergeordneten und untergeordneten Tabellen in der übergeordneten Tabelle miteinander verschachtelt, befinden sich diese Daten im Schlüsselbereich immer nahe beieinander. Wir empfehlen, die Mutationsgruppe entweder so zu strukturieren, dass sie eine auf eine übergeordnete Tabelle angewendete Mutation sowie zusätzliche, auf untergeordnete Tabellen angewendete Mutationen enthält oder dass mit sämtlichen Mutationen im Schlüsselbereich nahe beieinander liegende Daten geändert werden. Weitere Informationen dazu, wie Cloud Spanner Daten von über- und untergeordneten Tabellen speichert, finden Sie unter Schema und Datenmodell. Wenn Sie die Mutationsgruppen nicht anhand der empfohlenen Tabellenhierarchien strukturieren oder wenn sich die Daten, auf die zugegriffen wird, im Schlüsselbereich nicht nahe beieinander befinden, muss Cloud Spanner möglicherweise Zwei-Phasen-Commits durchführen. Das führt zu einer geringeren Leistung. Weitere Informationen finden Sie unter Kompromisse bei der Lokalität.

Wenn Sie eine MutationGroup verwenden möchten, erstellen Sie eine SpannerIO.write()-Transformation und rufen Sie die Methode SpannerIO.Write.grouped() auf. Diese gibt eine Transformation zurück, die Sie dann auf eine PCollection von Objekten des Typs MutationGroup anwenden können.

Beim Erstellen einer MutationGroup wird die erste aufgelistete Mutation zur primären Mutation. Wenn Ihre Mutationsgruppe sowohl eine übergeordnete als auch eine untergeordnete Tabelle betrifft, sollte die primäre Mutation eine Mutation der übergeordneten Tabelle sein. Andernfalls können Sie jede Mutation als primäre Mutation verwenden. Der Cloud Dataflow-Connector verwendet die primäre Mutation, um Partitionsgrenzen zu bestimmen, damit Mutationen effizient zusammengefasst werden können.

Stellen Sie sich beispielsweise vor, dass Ihre Anwendung das Verhalten der Nutzer überwacht und problematische Fälle kennzeichnet, damit diese überprüft werden. Für jeden gekennzeichneten Fall des Nutzerverhaltens möchten Sie die Tabelle Users aktualisieren, damit der Nutzer nicht mehr auf Ihre Anwendung zugreifen kann. Außerdem wollen Sie den Vorfall in der Tabelle PendingReviews aufzeichnen. Verwenden Sie eine MutationGroup, um dafür zu sorgen, dass beide Tabellen untrennbar voneinander aktualisiert werden:

PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

Beim Erstellen einer Mutationsgruppe wird die erste Mutation, die als Argument angegeben ist, zur primären Mutation. In diesem Fall haben die beiden Tabellen keinen Bezug zueinander, also gibt es keine eindeutige primäre Mutation. Wir haben uns für userMutation als primäre Mutation entschieden und haben sie an die erste Stelle gesetzt. Beide Mutationen separat auszuführen wäre zwar schneller, würde aber die Untrennbarkeit nicht garantieren, weshalb die Mutationsgruppe in dieser Situation die beste Wahl darstellt.

Weitere Informationen