Importa, esporta e modifica i dati utilizzando Dataflow

Dataflow è un servizio gestito per la trasformazione e l'arricchimento dei dati. Il connettore Dataflow per Spanner consente di leggere e scrivere dati in Spanner in una pipeline Dataflow, trasformando o modificando i dati facoltativamente. Puoi anche creare pipeline per il trasferimento dei dati tra Spanner e altri prodotti Google Cloud.

Il connettore Dataflow è il metodo consigliato per spostare in modo efficiente i dati da e verso Spanner in blocco e per eseguire trasformazioni di grandi dimensioni in un database non supportate da DML partizionato, ad esempio spostamento di tabelle, eliminazione in blocco che richiedono un JOIN e così via. Quando lavori con singoli database, esistono altri metodi che puoi utilizzare per importare ed esportare i dati:

  • Utilizza la console Google Cloud per esportare un singolo database da Spanner a Cloud Storage in formato Avro.
  • Utilizza la console Google Cloud per import di nuovo un database in Spanner dai file esportati in Cloud Storage.
  • Utilizza l'API REST o Google Cloud CLI per eseguire job di esportazione o import da Spanner a Cloud Storage e viceversa (anche utilizzando il formato Avro).

Il connettore Dataflow per Spanner fa parte dell'SDK Java Apache Beam e fornisce un'API per eseguire le azioni precedenti. Per ulteriori informazioni su alcuni dei concetti discussi di seguito, ad esempio oggetti e trasformazioni PCollection, consulta la guida alla programmazione Apache Beam.

Aggiungi il connettore al progetto Maven

Per aggiungere il connettore Google Cloud Dataflow a un progetto Maven, aggiungi l'artefatto Maven beam-sdks-java-io-google-cloud-platform al file pom.xml come dipendenza.

Ad esempio, supponendo che il file pom.xml imposti beam.version sul numero di versione appropriato, devi aggiungere la dipendenza seguente:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Lettura dati da Spanner

Per leggere da Spanner, applica la trasformazione SpannerIO.read(). Configura la lettura utilizzando i metodi della classe SpannerIO.Read. L'applicazione della trasformazione restituisce un valore PCollection<Struct>, in cui ogni elemento nella raccolta rappresenta una singola riga restituita dall'operazione di lettura. Puoi leggere da Spanner con e senza una query SQL specifica, a seconda dell'output desiderato.

L'applicazione della trasformazione SpannerIO.read() restituisce una visualizzazione coerente dei dati eseguendo una lettura efficace. Se non diversamente specificato, il risultato della lettura viene creato uno snapshot al momento dell'avvio della lettura. Per saperne di più sui diversi tipi di letture che Spanner può eseguire, consulta la sezione Letture.

Leggere i dati utilizzando una query

Per leggere un set specifico di dati da Spanner, configura la trasformazione utilizzando il metodo SpannerIO.Read.withQuery() per specificare una query SQL. Ad esempio:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

Leggi i dati senza specificare una query

Per leggere da un database senza utilizzare una query, puoi specificare il nome di una tabella utilizzando il metodo SpannerIO.Read.withTable() e un elenco di colonne da leggere con il metodo SpannerIO.Read.withColumns(). Ad esempio:

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

Per limitare le righe lette, puoi specificare un set di chiavi primarie da leggere utilizzando il metodo SpannerIO.Read.withKeySet().

Puoi anche leggere una tabella utilizzando un indice secondario specificato. Come per la chiamata API readUsingIndex(), l'indice deve contenere tutti i dati che vengono visualizzati nei risultati della query.

A questo scopo, specifica la tabella come mostrato nell'esempio precedente e specifica l'indice che contiene i valori della colonna desiderati utilizzando il metodo SpannerIO.Read.withIndex(). L'indice deve archiviare tutte le colonne che la trasformazione deve leggere. La chiave primaria della tabella di base è archiviata in modo implicito. Ad esempio, per leggere la tabella Songs utilizzando l'indice SongsBySongName, usa il codice seguente:

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

Controlla il mancato aggiornamento dei dati delle transazioni

È garantito che venga eseguita una trasformazione su uno snapshot di dati coerente. Per controllare l'inattività dei dati, utilizza il metodo SpannerIO.Read.withTimestampBound(). Per ulteriori informazioni, consulta le transazioni.

Lettura da più tabelle nella stessa transazione

Se vuoi leggere i dati di più tabelle nello stesso momento per garantire la coerenza dei dati, esegui tutte le letture in una singola transazione. A questo scopo, applica una trasformazione createTransaction(), creando un oggetto PCollectionView<Transaction> che a sua volta crea una transazione. La visualizzazione risultante può essere passata a un'operazione di lettura utilizzando SpannerIO.Read.withTransaction().

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

Leggi i dati da tutte le tabelle disponibili

Puoi leggere i dati da tutte le tabelle disponibili in un database Spanner.

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

Risolvere i problemi relativi alle query non supportate

Il connettore Dataflow supporta solo le query SQL di Spanner il cui primo operatore nel piano di esecuzione delle query è una Unione distribuita. Se provi a leggere i dati da Spanner utilizzando una query e ricevi un'eccezione che indica che la query does not have a DistributedUnion at the root, segui i passaggi descritti in Informazioni su come Spanner esegue le query per recuperare un piano di esecuzione per la query utilizzando la console Google Cloud.

Se la query SQL non è supportata, semplificala in una query che ha un'unione distribuita come primo operatore nel piano di esecuzione della query. Rimuovi le funzioni aggregate, i join di tabelle e gli operatori DISTINCT, GROUP BY e ORDER, in quanto sono gli operatori che molto probabilmente impediranno il funzionamento della query.

Creare mutazioni per una scrittura

Utilizza il metodo newInsertOrUpdateBuilder() della classe Mutation anziché il metodo newInsertBuilder(), a meno che non sia assolutamente necessario per le pipeline Java. Per le pipeline Python, usa SpannerInsertOrUpdate() anziché SpannerInsert(). Dataflow fornisce garanzie "almeno una volta", il che significa che la mutazione potrebbe essere scritta più volte. Di conseguenza, solo le mutazioni di INSERT potrebbero generare com.google.cloud.spanner.SpannerException: ALREADY_EXISTS errori che causano un errore della pipeline. Per evitare questo errore, utilizza la modifica INSERT_OR_UPDATE, che aggiunge una nuova riga o aggiorna i valori della colonna se la riga esiste già. La mutazione INSERT_OR_UPDATE può essere applicata più di una volta.

Scrivi in Spanner e trasforma i dati

Puoi scrivere dati in Spanner con il connettore Dataflow utilizzando una trasformazione SpannerIO.write() per eseguire una raccolta di mutazioni delle righe di input. Il connettore Dataflow raggruppa le mutazioni in batch per migliorare l'efficienza.

L'esempio seguente mostra come applicare una trasformazione di scrittura a una PCollection di mutazioni:

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

Se una trasformazione si arresta in modo imprevisto prima del completamento, le mutazioni già applicate non verranno sottoposte a rollback.

Applicare i gruppi di mutazioni a livello atomico

Puoi utilizzare la classe MutationGroup per garantire che un gruppo di mutazioni vengano applicate insieme a livello atomico. Le mutazioni in un elemento MutationGroup saranno sicuramente inviate nella stessa transazione, ma la transazione potrebbe essere riprovata.

I gruppi di mutazione funzionano meglio quando vengono utilizzati per raggruppare le mutazioni che interessano i dati archiviati vicini nello spazio delle chiavi. Poiché Spanner combina i dati della tabella padre e di quella figlio nella tabella padre, questi dati sono sempre vicini nello spazio delle chiavi. Ti consigliamo di strutturare il gruppo di mutazioni in modo che contenga una modifica applicata a una tabella padre e modifiche aggiuntive applicate alle tabelle figlio, oppure in modo che tutte le mutazioni modifichino i dati vicini nello spazio delle chiavi. Per saperne di più su come Spanner archivia i dati delle tabelle padre e figlio, consulta Schema e modello dei dati. Se non organizzi i gruppi di modifica in base alle gerarchie di tabelle consigliate o se i dati a cui si accede non sono vicini nello spazio delle chiavi, Spanner potrebbe dover eseguire commit in due fasi, il che comporterà un rallentamento delle prestazioni. Per ulteriori informazioni, consulta la sezione Scompensi per le località.

Per utilizzare MutationGroup, crea una trasformazione SpannerIO.write() e chiama il metodo SpannerIO.Write.grouped(), che restituisce una trasformazione che puoi applicare a PCollection di MutationGroup oggetti.

Quando crei un elemento MutationGroup, la prima mutazione elencata diventa la mutazione principale. Se il gruppo di mutazioni interessa una tabella padre e una tabella figlio, la mutazione principale deve essere una mutazione della tabella padre. Altrimenti, puoi usare qualsiasi mutazione come mutazione principale. Il connettore Dataflow utilizza la mutazione principale per determinare i limiti delle partizioni in modo da raggruppare in modo efficiente le mutazioni.

Ad esempio, immagina che la tua applicazione monitori il comportamento e segnali i comportamenti problematici degli utenti per sottoporli a revisione. Per ogni comportamento segnalato, devi aggiornare la tabella Users per bloccare l'accesso dell'utente all'applicazione e devi registrare l'incidente anche nella tabella PendingReviews. Per assicurarti che entrambe le tabelle vengano aggiornate a livello atomico, utilizza MutationGroup:

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

Quando crei un gruppo di mutazioni, la prima mutazione fornita come argomento diventa la mutazione principale. In questo caso, le due tabelle non sono correlate, quindi non c'è una chiara mutazione primaria. Abbiamo selezionato userMutation come principale posizionandolo per primo. Applicando le due mutazioni separatamente sarebbe più veloce, ma non garantirebbe l'atomicità, quindi il gruppo di mutazioni è la scelta migliore in questa situazione.

Passaggi successivi