Dataflow è un servizio gestito per la trasformazione e l'arricchimento dei dati. Il connettore Dataflow per Spanner consente di leggere e scrivere dati in Spanner in una pipeline Dataflow, trasformando o modificando i dati facoltativamente. Puoi anche creare pipeline che trasferiscono dati tra Spanner e altri prodotti Google Cloud.
Il connettore Dataflow è il metodo consigliato per spostare in modo efficiente i dati dentro e fuori Spanner in blocco e per eseguire grandi trasformazioni in un database non supportato dal DML partizionato, come spostamento di tabelle, eliminazioni collettive che richiedono un JOIN e così via. Quando si lavora con singoli database, esistono altri metodi per importare ed esportare i dati:
- Utilizza la console Google Cloud per esportare un singolo database da Spanner a Cloud Storage in formato Avro.
- Utilizza la console Google Cloud per import nuovamente un database in Spanner dai file che hai esportato in Cloud Storage.
- Utilizza l'API REST o Google Cloud CLI per eseguire job di esportazione o import da Spanner a Cloud Storage e viceversa (anche in formato Avro).
Il connettore Dataflow per Spanner fa parte dell'SDK Java Apache Beam e fornisce un'API per eseguire le azioni indicate sopra. Per ulteriori informazioni su alcuni dei concetti discussi di seguito, come gli oggetti e le trasformazioni PCollection, consulta la guida alla programmazione di Apache Beam.
Aggiungi il connettore al progetto Maven
Per aggiungere il connettore Dataflow di Google Cloud a un progetto Maven, aggiungi l'artefatto Maven beam-sdks-java-io-google-cloud-platform
al file pom.xml
come dipendenza.
Ad esempio, supponendo che il file pom.xml
imposti beam.version
sul
numero di versione appropriato, devi aggiungere la seguente dipendenza:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
Leggi i dati da Spanner
Per leggere da Spanner, applica la trasformazione SpannerIO.read().
Configura la lettura utilizzando i metodi della classe SpannerIO.Read
.
L'applicazione della trasformazione restituisce un valore PCollection<Struct>
, in cui ogni elemento della raccolta rappresenta una singola riga restituita dall'operazione di lettura. Puoi leggere da Spanner con e senza una query SQL
specifica, a seconda dell'output desiderato.
L'applicazione della trasformazione SpannerIO.read()
restituisce una visualizzazione coerente dei dati
eseguendo una lettura approfondita. Se non diversamente specificato, il risultato della lettura viene creato tramite snapshot nel momento in cui è stata avviata la lettura. Consulta le letture per ulteriori informazioni sui diversi tipi di letture che Spanner può eseguire.
Leggere i dati utilizzando una query
Per leggere un set di dati specifico da Spanner, configura la trasformazione utilizzando il metodo SpannerIO.Read.withQuery()
per specificare una query SQL. Ad esempio:
Leggi i dati senza specificare una query
Per leggere da un database senza utilizzare una query, puoi specificare il nome di una tabella utilizzando il metodo SpannerIO.Read.withTable() e specificare un elenco di colonne da leggere utilizzando il metodo SpannerIO.Read.withColumns(). Ad esempio:
GoogleSQL
PostgreSQL
Per limitare le righe lette, puoi specificare un set di chiavi primarie da leggere utilizzando il metodo SpannerIO.Read.withKeySet().
Puoi anche leggere una tabella utilizzando un indice secondario specificato. Come per la chiamata APIreadUsingIndex(), l'indice deve contenere tutti i dati visualizzati nei risultati della query.
Per farlo, specifica la tabella come mostrato nell'esempio precedente e specifica l'indice che contiene i valori della colonna desiderati utilizzando il metodo SpannerIO.Read.withIndex()
. L'indice deve archiviare tutte le colonne che la trasformazione deve leggere. La chiave primaria della tabella
di base è archiviata in modo implicito. Ad esempio, per leggere la tabella Songs
utilizzando l'indice SongsBySongName
, utilizza il seguente codice:
GoogleSQL
PostgreSQL
Controlla l'obsolescenza dei dati delle transazioni
È garantita l'esecuzione di una trasformazione su uno snapshot di dati coerente. Per controllare l'inattività dei dati, utilizza il metodo SpannerIO.Read.withTimestampBound()
. Vedi le transazioni per ulteriori informazioni.
Lettura da più tabelle nella stessa transazione
Se vuoi leggere i dati da più tabelle nello stesso momento per garantire la coerenza dei dati, esegui tutte le letture in un'unica transazione. Per farlo, applica una trasformazione createTransaction()
, creando un oggetto PCollectionView<Transaction>
e quindi una transazione. La vista risultante può essere passata a un'operazione di lettura utilizzando SpannerIO.Read.withTransaction()
.
GoogleSQL
PostgreSQL
Leggi i dati di tutte le tabelle disponibili
Puoi leggere i dati da tutte le tabelle disponibili in un database Spanner.
GoogleSQL
PostgreSQL
Risolvere i problemi relativi alle query non supportate
Il connettore Dataflow supporta solo le query SQL di Spanner il cui primo operatore nel piano di esecuzione delle query è Distributed Union. Se tenti di leggere i dati da Spanner utilizzando una query e ricevi un'eccezione che indica che la query does not have a DistributedUnion at
the root
, segui i passaggi descritti in Comprendere in che modo Spanner esegue le query per recuperare un piano di esecuzione per la tua query utilizzando la console Google Cloud.
Se la tua query SQL non è supportata, semplificala in modo che sia una query con unione distribuita come primo operatore nel piano di esecuzione della query. Rimuovi le funzioni aggregate, i join di tabelle e gli operatori DISTINCT
, GROUP BY
e ORDER
, in quanto sono gli operatori che hanno maggiori probabilità di impedire il funzionamento della query.
Crea mutazioni per una scrittura
Utilizza il metodo newInsertOrUpdateBuilder()
della classe Mutation
anziché il metodo newInsertBuilder()
, a meno che non sia assolutamente necessario per le pipeline Java. Per le pipeline Python, utilizza SpannerInsertOrUpdate()
anziché SpannerInsert()
. Dataflow fornisce garanzie "at-least-once", il che significa che la mutazione può essere scritta più volte. Di conseguenza, solo le mutazioni di INSERT
potrebbero generare
com.google.cloud.spanner.SpannerException: ALREADY_EXISTS
errori che causano
l'errore della pipeline. Per evitare questo errore, utilizza la mutazione INSERT_OR_UPDATE
, che aggiunge una nuova riga o aggiorna i valori della colonna se la riga esiste già. La mutazione INSERT_OR_UPDATE
può essere applicata più di una volta.
Scrivi in Spanner e trasforma i dati
Puoi scrivere dati in Spanner con il connettore Dataflow utilizzando una trasformazione SpannerIO.write()
per eseguire una raccolta di mutazioni delle righe di input. Il connettore Dataflow raggruppa le mutazioni
in batch per garantire l'efficienza.
L'esempio seguente mostra come applicare una trasformazione di scrittura a un PCollection
di
mutazioni:
GoogleSQL
PostgreSQL
Se una trasformazione si arresta in modo imprevisto prima del completamento, le mutazioni già applicate non verranno riportate.
Applicare gruppi di mutazioni a livello atomico
Puoi utilizzare la classe MutationGroup
per garantire che un
gruppo di mutazioni vengano applicate insieme atomicamente. È garantito che le modifiche in un
MutationGroup
vengano inviate nella stessa transazione, ma è possibile
tentare di nuovo la transazione.
I gruppi di mutazione hanno un rendimento migliore quando vengono utilizzati per raggruppare le mutazioni che interessano i dati archiviati vicini nello spazio delle chiavi. Poiché Spanner interlea i dati delle tabelle padre e figlio nella tabella padre, quei dati sono sempre vicini nello spazio delle chiavi. Ti consigliamo di strutturare il gruppo di mutazioni in modo che contenga una mutazione applicata a una tabella padre e altre mutazioni applicate alle tabelle figlio oppure in modo che tutte le sue mutazioni modifichino i dati vicini nello spazio delle chiavi. Per saperne di più su come Spanner archivia i dati delle tabelle padre e figlio, consulta Schema e modello dei dati. Se non organizzi i gruppi di mutazioni in base alle gerarchie di tabelle consigliate o se i dati a cui si accede non sono vicini nello spazio della chiave, Spanner potrebbe dover eseguire commit in due fasi, il che causerà prestazioni più lente. Per ulteriori informazioni, consulta la sezione Scontri a livello di località.
Per utilizzare MutationGroup
, crea una trasformazione SpannerIO.write()
e chiama il metodo SpannerIO.Write.grouped()
, che restituisce una trasformazione che puoi applicare a PCollection
di MutationGroup
oggetti.
Quando crei un MutationGroup
, la prima mutazione elencata diventa la
principale mutazione. Se il gruppo di mutazione interessa sia una tabella padre sia una tabella secondaria, la mutazione principale deve essere una mutazione nella tabella padre. Altrimenti,
puoi usare qualsiasi mutazione come mutazione principale. Il connettore Dataflow utilizza la mutazione principale per determinare i confini della partizione in modo da raggruppare le mutazioni in batch in modo efficiente.
Ad esempio, immagina che la tua applicazione monitori il comportamento e segnali
il comportamento problematico degli utenti per la revisione. Per ogni comportamento segnalato, vuoi aggiornare la tabella Users
per bloccare l'accesso dell'utente alla tua applicazione. Inoltre, devi registrare l'incidente nella tabella PendingReviews
. Per assicurarti che entrambe le tabelle siano aggiornate atomicamente, utilizza un attributo MutationGroup
:
GoogleSQL
PostgreSQL
Quando si crea un gruppo di mutazione, la prima mutazione fornita come argomento diventa la mutazione principale. In questo caso, le due tabelle non sono correlate, quindi non esiste una chiara mutazione primaria. Abbiamo selezionato userMutation
come principale
posizionandolo per primo. L'applicazione separata delle due mutazioni sarebbe più veloce, ma non ne garantisce l'atomicità, perciò il gruppo di mutazione è la scelta migliore in questa situazione.
Passaggi successivi
- Scopri di più sulla progettazione di una pipeline di dati Apache Beam.
- Esporta e import i database Spanner nella console Google Cloud utilizzando Dataflow.