Importa, esporta e modifica i dati con Dataflow

Dataflow è un servizio gestito per la trasformazione e l'arricchimento dei dati. Il connettore Dataflow per Spanner ti consente di leggere i dati e scrivere dati in Spanner in una pipeline Dataflow, trasformare o modificare i dati facoltativamente. Puoi anche creare pipeline che trasferiscono dati tra Spanner e altri prodotti Google Cloud.

Il connettore Dataflow è il metodo consigliato per gestire in modo e spostare dati da e fuori Spanner in blocco e per eseguire trasformazioni in un database non supportate dal DML partizionato, come spostamenti di tabelle, eliminazioni collettive che richiedono un JOIN e così via. Durante il lavoro con i singoli database, ci sono altri metodi che puoi usare per importare esporta i dati:

  • Utilizza la console Google Cloud per esportare un singolo database da da Spanner a Cloud Storage in formato Avro.
  • Utilizzare la console Google Cloud per importare nuovamente un database in Spanner dai file esportati in Cloud Storage.
  • Utilizza l'API REST o Google Cloud CLI per eseguire l'esportazione o Importare job da Spanner a Cloud Storage e viceversa (anche utilizzando il formato Avro).

Il connettore Dataflow per Spanner fa parte della SDK Apache Beam Java, che fornisce un'API per eseguire le operazioni descritte in precedenza azioni. Per ulteriori informazioni su alcuni dei concetti discussi di seguito, ad esempio come oggetti PCollection e trasforma, consulta la guida alla programmazione di Apache Beam.

Aggiungi il connettore al progetto Maven

Aggiungere il connettore Google Cloud Dataflow a un account Maven. progetto, aggiungi l'artefatto Maven beam-sdks-java-io-google-cloud-platform a il file pom.xml come dipendenza.

Ad esempio, supponendo che il file pom.xml imposti beam.version nel numero di versione appropriato, aggiungerai la seguente dipendenza:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Leggi i dati da Spanner

Per leggere da Spanner, applica la trasformazione SpannerIO.read(). Configura la lettura utilizzando i metodi della classe SpannerIO.Read. L'applicazione della trasformazione restituisce un valore PCollection<Struct>, dove ogni elemento della raccolta rappresenta una singola riga restituita dalla lettura operativa. Puoi leggere da Spanner con e senza un prompt SQL specifico a seconda dell'output desiderato.

L'applicazione della trasformazione SpannerIO.read() restituisce una visualizzazione coerente dei dati per durante una lettura impegnativa. Se non diversamente specificato, il risultato della lettura viene creato uno snapshot nel momento in cui hai iniziato la lettura. Per saperne di più, consulta le letture. e informazioni sui diversi tipi di letture che Spanner può eseguire.

Leggere i dati utilizzando una query

Per leggere un set di dati specifico da Spanner, configura il metodo utilizzando il metodo SpannerIO.Read.withQuery() per specificare una query query. Ad esempio:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

Leggi i dati senza specificare una query

Per leggere da un database senza utilizzare una query, puoi specificare una tabella utilizzando il metodo SpannerIO.Read.withTable() e specifica un elenco di colonne da leggere utilizzando la funzione SpannerIO.Read.withColumns() . Ad esempio:

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

Per limitare il numero di righe lette, puoi specificare un set di chiavi primarie da leggere utilizzando Metodo SpannerIO.Read.withKeySet().

Puoi anche leggere una tabella utilizzando un indice secondario specificato. Come per la readUsingIndex(), l'indice deve contenere tutti i dati che vengono visualizzate nei risultati della query.

A questo scopo, specifica la tabella come mostrato nell'esempio precedente e indice che contenga i valori della colonna desiderati utilizzando la proprietà SpannerIO.Read.withIndex(). L'indice deve archiviare tutti le colonne che la trasformazione deve leggere. La chiave primaria della tabella di base è archiviati in modo implicito. Ad esempio, per leggere la tabella Songs utilizzando l'indice SongsBySongName, utilizzi codice seguente:

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

Controlla l'obsolescenza dei dati delle transazioni

È garantita l'esecuzione di una trasformazione su uno snapshot di dati coerente. A controllare l'inattività dei dati, utilizza SpannerIO.Read.withTimestampBound(). Consulta transazioni per ulteriori informazioni.

Lettura da più tabelle nella stessa transazione

Se vuoi leggere i dati da più tabelle nello stesso momento per garantire la coerenza dei dati ed eseguire tutte le operazioni di lettura in un'unica transazione. Da fare quindi applica una trasformazione di createTransaction(), creando un oggetto PCollectionView<Transaction> che crea quindi una transazione. La la vista risultante può essere passata a un'operazione di lettura SpannerIO.Read.withTransaction().

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

Leggi i dati di tutte le tabelle disponibili

Puoi leggere i dati da tutte le tabelle disponibili in un database Spanner.

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

Risolvere i problemi relativi alle query non supportate

Il connettore Dataflow supporta solo le query SQL di Spanner in cui il primo operatore nel piano di esecuzione della query è un dell'Unione Europea. Se tenti di leggere i dati da Spanner utilizzando una query ricevi un'eccezione che indica che la query does not have a DistributedUnion at the root, segui i passaggi descritti in Comprendere l'esecuzione di Spanner query per recuperare un piano di esecuzione per la query utilizzando nella console Google Cloud.

Se la tua query SQL non è supportata, semplificala in modo che sia una query con un come primo operatore nel piano di esecuzione delle query. Rimuovi aggregato le funzioni, i join di tabelle e gli operatori DISTINCT, GROUP BY e ORDER, in quanto sono gli operatori che hanno maggiori probabilità di impedire la query dall'attività lavorativa.

Crea mutazioni per una scrittura

Utilizza la classe Mutation newInsertOrUpdateBuilder() anziché Metodo newInsertBuilder() a meno che non siano assolutamente necessari per le pipeline Java. Per le pipeline Python, utilizza SpannerInsertOrUpdate() anziché SpannerInsert(). Dataflow fornisce garanzie "at-least-once", il che significa che la mutazione potrebbe essere scritta più volte. Di conseguenza, potrebbero essere generate solo mutazioni per INSERT com.google.cloud.spanner.SpannerException: ALREADY_EXISTS errori che causano l'errore della pipeline. Per evitare questo errore, utilizza INSERT_OR_UPDATE una modifica, che aggiunge una nuova riga o aggiorna i valori della colonna se la riga esiste già. La mutazione INSERT_OR_UPDATE può essere applicata più di una volta.

Scrivi in Spanner e trasforma i dati

Puoi scrivere dati in Spanner con Dataflow utilizzando una trasformazione di SpannerIO.write() per eseguire un raccolta di mutazioni delle righe di input. I gruppi di connettori Dataflow le mutazioni in batch per una maggiore efficienza.

L'esempio seguente mostra come applicare una trasformazione di scrittura a un elemento PCollection di mutazioni:

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

Se una trasformazione si interrompe inaspettatamente prima del completamento, le mutazioni che hanno già applicato non verrà eseguito il rollback.

Applicare gruppi di mutazioni a livello atomico

Puoi utilizzare la classe MutationGroup per assicurarti che un un gruppo di mutazioni applicati a livello atomico. Mutazioni in un È garantito che MutationGroup vengano inviati nella stessa transazione, ma la transazione potrebbe essere riprovata.

I gruppi di mutazione hanno un rendimento migliore quando vengono utilizzati per raggruppare le mutazioni che incide sui dati archiviati vicini nello spazio delle chiavi. Poiché Spanner alterna i dati delle tabelle padre e figlio nella tabella padre, quei dati sono sempre vicini nello spazio delle chiavi. Ti consigliamo di: strutturare il gruppo di mutazione in modo che contenga una mutazione applicata a una tabella padre e altre mutazioni applicate alle tabelle figlio oppure in modo che tutte le sue mutazioni modifichino i dati vicini spazio dei tasti. Per saperne di più su come Spanner archivia i nodi padre dati delle tabelle secondarie, consulta Schema e modello dei dati. Se non organizzi i tuoi gruppi di mutazioni attorno alle gerarchie di tabelle consigliate o se i dati a cui si accede non sono vicini nello spazio delle chiavi, Spanner potrebbe devi eseguire commit in due fasi, con un conseguente rallentamento delle prestazioni. Per Per ulteriori informazioni, consulta la sezione Scontri a livello di località.

Per utilizzare MutationGroup, crea una trasformazione SpannerIO.write() e chiama il metodo SpannerIO.Write.grouped(), che restituisce un una trasformazione che puoi applicare a PCollection di MutationGroup oggetti.

Quando crei un MutationGroup, la prima mutazione elencata diventa la una mutazione primaria. Se il gruppo di mutazione colpisce sia un genitore che un bambino , la mutazione principale deve essere una mutazione della tabella padre. Altrimenti, puoi usare qualsiasi mutazione come mutazione principale. Dataflow utilizza la mutazione primaria per determinare i confini della partizione in ordine per raggruppare le mutazioni in modo efficiente.

Ad esempio, immagina che la tua applicazione monitori il comportamento e i flag il comportamento problematico dell'utente da esaminare. Per ogni comportamento segnalato, aggiornare la tabella Users per bloccare l'accesso dell'utente alla tua applicazione. e registrare l'incidente nella tabella PendingReviews. Per assicurarti per aggiornare entrambe le tabelle a livello atomico, utilizza un valore MutationGroup:

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

Quando si crea un gruppo di mutazione, viene fornita la prima mutazione come argomento diventa la mutazione principale. In questo caso le due tabelle non sono correlate, quindi non esiste una chiara mutazione primaria. Abbiamo selezionato userMutation come principale da posizionandolo per primo. Applicare le due mutazioni separatamente sarebbe più veloce, ma non garantisce l'atomicità, perciò il gruppo di mutazione è la scelta migliore la situazione.

Passaggi successivi