Questa pagina mostra come creare pipeline Dataflow che consumano e inoltrano i dati delle modifiche di Spanner utilizzando le modifiche in tempo reale. Puoi utilizzare il codice di esempio in questa pagina per creare pipeline personalizzate.
Concetti principali
Di seguito sono riportati alcuni concetti fondamentali per le pipeline di Dataflow per modifiche in tempo reale.
Dataflow
Dataflow è un servizio serverless, veloce e conveniente che supporta sia l'elaborazione in streaming che quella in batch. Offre portabilità con i job di elaborazione scritti utilizzando le librerie open source di Apache Beam e automatizza il provisioning dell'infrastruttura e la gestione dei cluster. Dataflow fornisce uno streaming quasi in tempo reale durante la lettura dagli modifiche in tempo reale.
Puoi utilizzare Dataflow per utilizzare le modifiche in tempo reale di Spanner con il connettore SpannerIO, che offre un'astrazione sull'API Spanner per eseguire query sulle modifiche in tempo reale. Con questo connettore, non devi gestire il ciclo di vita delle partizioni modifiche in tempo reale, che è necessario quando utilizzi direttamente l'API Spanner. Il connettore fornisce un flusso di record di modifica dei dati in modo da poterti concentrare maggiormente sulla logica dell'applicazione e meno su dettagli specifici dell'API e sulla partizione dinamica dei flussi di modifica. Ti consigliamo di utilizzare il connettore SpannerIO anziché l'API Spanner nella maggior parte dei casi in cui devi leggere i dati dello stream delle modifiche.
I modelli Dataflow sono pipeline Dataflow predefinite che implementano casi d'uso comuni. Per una panoramica, consulta Modelli Dataflow.
Pipeline Dataflow
Una pipeline Dataflow di modifiche in tempo reale Spanner è composta da quattro parti principali:
- Un database Spanner con un flusso di modifiche
- Il connettore SpannerIO
- Trasformazioni e destinazioni definite dall'utente
- Uno scrittore I/O di destinazione Apache Beam
Flusso di modifiche Spanner
Per informazioni dettagliate su come creare un flusso di modifiche, consulta Creare un flusso di modifiche.
Connettore Apache Beam SpannerIO
Si tratta del connettore SpannerIO descritto nella sezione precedente su Dataflow.
Si tratta di un connettore I/O di origine che emette un PCollection
di record di modifica dei dati
nelle fasi successive della pipeline. L'ora dell'evento per ogni record di modifica dei dati emesso sarà il timestamp del commit. Tieni presente che i record emessi sono
non ordinati e che il connettore SpannerIO garantisce che non ci saranno
record in ritardo.
Quando si utilizzano i modifiche in tempo reale, Dataflow utilizza i checkpoint. Di conseguenza, ogni worker potrebbe attendere fino all'intervallo di checkpoint configurato per il buffering delle modifiche prima di inviarle per l'ulteriore elaborazione.
Trasformazioni definite dall'utente
Una trasformazione definita dall'utente consente a un utente di aggregare, trasformare o modificare i dati di elaborazione all'interno di una pipeline Dataflow. Alcuni casi d'uso comuni sono la rimozione di informazioni che consentono l'identificazione personale, la soddisfazione dei requisiti relativi al formato dei dati a valle e l'ordinamento. Consulta la documentazione ufficiale di Apache Beam per la guida alla programmazione sulle trasformazioni.
Scrittore I/O di destinazione Apache Beam
Apache Beam contiene connettori I/O integrati che possono essere utilizzati per scrivere da una pipeline Dataflow in un'area di destinazione dei dati come BigQuery. Le destinazioni dati più comuni sono supportate in modo nativo.
Modelli Dataflow
I modelli Dataflow forniscono un metodo per creare job Dataflow basati su immagini Docker precompilate per casi d'uso comuni utilizzando la console Google Cloud , il Google Cloud CLI o le chiamate API Rest.
Per le modifiche in tempo reale di Spanner, forniamo tre modelli flessibili Dataflow:
Impostare le autorizzazioni IAM per i modelli Dataflow
Prima di creare un job Dataflow con i tre modelli flessibili elencati, assicurati di disporre delle autorizzazioni IAM richieste per i seguenti account di servizio:
Se non disponi delle autorizzazioni IAM richieste, devi specificare un account di servizio worker gestito dall'utente per creare il job Dataflow. Per ulteriori informazioni, consulta Autorizzazioni e sicurezza del flusso di dati.
Quando provi a eseguire un job da un modello flessibile di Dataflow senza tutte le autorizzazioni richieste, il job potrebbe non riuscire con un errore di lettura del file del risultato o un errore di autorizzazione negata per la risorsa. Per ulteriori informazioni, consulta Risolvere i problemi relativi ai modelli flessibili.
Creare una pipeline Dataflow
Questa sezione illustra la configurazione iniziale del connettore e fornisce esempi di integrazioni comuni con la funzionalità di modifiche in tempo reale di Spanner.
Per seguire questi passaggi, devi disporre di un ambiente di sviluppo Java per Dataflow. Per ulteriori informazioni, consulta Creare una pipeline Dataflow utilizzando Java.
Crea un flusso di modifiche
Per informazioni dettagliate su come creare un flusso di modifiche, consulta Creare un flusso di modifiche. Per continuare con i passaggi successivi, devi disporre di un database Spanner con uno stream di modifiche configurato.
Concedi i privilegi di controllo dell'accesso granulare
Se prevedi che gli utenti con controllo dell'accesso granulare debbano eseguire il job Dataflow, assicurati che agli utenti sia stato concesso l'accesso a un ruolo del database che abbia il privilegio SELECT
per il flusso di modifiche e il privilegio EXECUTE
per la funzione con valore di tabella del flusso di modifiche. Inoltre, assicurati che il principale specifichi il ruolo database nella configurazione di SpannerIO o nel modello flessibile di Dataflow.
Per ulteriori informazioni, consulta Informazioni sul controllo dell'accesso granulare.
Aggiungi il connettore SpannerIO come dipendenza
Il connettore Apache Beam SpannerIO incapsula la complessità dell'utilizzo degli modifiche in tempo reale direttamente utilizzando l'API Cloud Spanner, emettendo una PCollection di record di dati dello stream di variazioni nelle fasi successive della pipeline.
Questi oggetti possono essere utilizzati in altre fasi della pipeline Dataflow dell'utente. L'integrazione del flusso di modifiche fa parte del connettore SpannerIO. Per poter utilizzare il connettore SpannerIO, la dipendenza deve essere aggiunta al file pom.xml
:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>
Crea un database di metadati
Il connettore deve tenere traccia di ogni partizione durante l'esecuzione della pipeline Apache Beam. Memorizza questi metadati in una tabella Spanner creata dal connettore durante l'inizializzazione. Specifica il database in cui verrà creata questa tabella durante la configurazione del connettore.
Come descritto nelle best practice per gli stream di modifiche, consigliamo di creare un nuovo database a questo scopo, anziché consentire al connettore di utilizzare il database della tua applicazione per memorizzare la tabella dei metadati.
Il proprietario di un job Dataflow che utilizza il connettore SpannerIO deve avere impostato le seguenti autorizzazioni IAM con questo database di metadati:
spanner.databases.updateDdl
spanner.databases.beginReadOnlyTransaction
spanner.databases.beginOrRollbackReadWriteTransaction
spanner.databases.read
spanner.databases.select
spanner.databases.write
spanner.sessions.create
spanner.sessions.get
Configura il connettore
Il connettore per modifiche in tempo reale di Spanner può essere configurato come segue:
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
startTime.getSeconds() + (10 * 60),
startTime.getNanos()
);
SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-meta-instance-id")
.withMetadataDatabase("my-meta-database-id")
.withMetadataTable("my-meta-table-name")
.withRpcPriority(RpcPriority.MEDIUM)
.withInclusiveStartAt(startTime)
.withInclusiveEndAt(endTime);
Di seguito sono riportate le descrizioni delle opzioni readChangeStream()
:
Configurazione Spanner (obbligatoria)
Utilizzato per configurare il progetto, l'istanza e il database in cui è stato creato lo stream di modifiche e da cui devono essere eseguite le query. Se vuoi, specifica anche il ruolo database da utilizzare quando l'entità IAM che esegue il job Dataflow è un utente di controllo dell'accesso dell'accesso granulare. Il job assume questo ruolo del database per l'accesso allo stream delle modifiche. Per maggiori informazioni, consulta Informazioni sul controllo dell'accesso granulare.
Nome del flusso di modifiche (obbligatorio)
Questo nome identifica in modo univoco il flusso di modifiche. Il nome qui deve essere lo stesso utilizzato durante la creazione.
(Facoltativo) ID istanza dei metadati
Questa è l'istanza in cui vengono memorizzati i metadati utilizzati dal connettore per controllare il consumo dei dati dell'API di stream di modifiche.
ID database dei metadati (obbligatorio)
Si tratta del database in cui vengono memorizzati i metadati utilizzati dal connettore per controllare il consumo dei dati dell'API flusso di modifiche.
Nome della tabella dei metadati (facoltativo)
Questo valore deve essere utilizzato solo per aggiornare una pipeline esistente.
Si tratta del nome della tabella dei metadati preesistente da utilizzare dal connettore. Viene utilizzato dal connettore per memorizzare i metadati al fine di controllare il consumo dei dati dell'API flusso di modifiche. Se questa opzione viene omessa, Spanner crea una nuova tabella con un nome generato all'inizializzazione del connettore.
Priorità RPC (facoltativa)
La priorità della richiesta da utilizzare per le query sugli stream di variazioni. Se questo parametro viene omesso, verrà utilizzato high
priority
.
InclusiveStartAt (obbligatorio)
Le modifiche dal timestamp specificato vengono restituite al chiamante.
InclusiveEndAt (facoltativo)
Le modifiche fino al timestamp specificato vengono restituite al chiamante. Se questo parametro viene omesso, le modifiche verranno emesse a tempo indeterminato.
Aggiungi trasformazioni e sink per elaborare i dati di modifica
Una volta completati i passaggi precedenti, il connettore SpannerIO configurato è pronto
a emettere una PCollection di oggetti DataChangeRecord
.
Consulta Esempi di trasformazioni e destinazioni per diverse configurazioni di pipeline di esempio che elaborano questi dati in streaming in vari modi.
Tieni presente che i record dello stream delle modifiche emessi dal connettore SpannerIO non sono ordinati. Questo perché le PCollection non forniscono alcuna garanzia di ordinamento. Se hai bisogno di un flusso ordinato, devi raggruppare e ordinare i record come trasformazioni nelle pipeline: consulta Esempio: ordine per chiave. Puoi estendere questo esempio per ordinare i record in base a qualsiasi campo, ad esempio in base agli ID transazione.
Esempi di trasformazioni e sink
Puoi definire le tue trasformazioni e specificare i sink in cui scrivere i dati. La documentazione di Apache Beam fornisce una miriade di trasformazioni che possono essere applicate, nonché connettori I/O pronti all'uso per scrivere i dati in sistemi esterni.
Esempio: ordina per chiave
Questo esempio di codice emette record di variazione dei dati ordinati in base al timestamp del commit e raggruppati per chiavi principali utilizzando il connettore Dataflow.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new BreakRecordByModFn()))
.apply(ParDo.of(new KeyByIdFn()))
.apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
// Subsequent processing goes here
Questo esempio di codice utilizza stati e timer per mettere in buffer i record per ogni chiave e imposta la data e l'ora di scadenza del timer su un'ora configurata dall'utente T
in futuro (definita nella funzione BufferKeyUntilOutputTimestamp). Quando la marcatura temporale di Dataflow supera il tempo T
, questo codice svuota tutti i record nel buffer con timestamp inferiore a T
, li ordina in base al timestamp del commit e genera una coppia chiave-valore in cui:
- La chiave è la chiave di input, ovvero la chiave primaria sottoposta ad hashing in un array di bucket di dimensioni 1000.
- Il valore è costituito dai record delle modifiche dei dati ordinati memorizzati nella memoria intermedia per la chiave.
Per ogni chiave, offriamo le seguenti garanzie:
- L'attivazione dei timer è garantita in base al timestamp di scadenza.
- È garantito che le fasi a valle ricevano gli elementi nello stesso ordine in cui sono stati prodotti.
Ad esempio, con una chiave del valore 100, il timer viene attivato rispettivamente a T1
e T10
, producendo un pacchetto di record di variazione dei dati a ogni timestamp. Poiché i record di variazione dei dati generati in T1
sono stati prodotti prima di quelli generati in T10
, è garantito che anche i record di variazione dei dati generati in T1
verranno ricevuti dalla fase successiva prima di quelli generati in T10
. Questo meccanismo ci consente di garantire un'organizzazione rigorosa dei timestamp dei commit in base alla chiave primaria per l'elaborazione a valle.
Questo processo verrà ripetuto fino al termine della pipeline e all'elaborazione di tutti i record di modifica dei dati (oppure verrà ripetuto all'infinito se non viene specificata un'ora di fine).
Tieni presente che questo esempio di codice utilizza stati e timer, anziché finestre, per eseguire l'ordinamento per chiave. Il motivo è che non è garantito che le finestre vengano elaborate in ordine. Ciò significa che le finestre precedenti possono essere elaborate in un secondo momento rispetto a quelle più recenti, il che potrebbe comportare un'elaborazione fuori sequenza.
BreakRecordByModFn
Ogni record di modifica dei dati può contenere più mod. Ogni mod rappresenta un inserimento, un aggiornamento o un'eliminazione di un singolo valore della chiave primaria. Questa funzione suddivide ogni record di modifica dei dati in record di modifica dei dati distinti, uno per mod.
private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
DataChangeRecord> {
@ProcessElement
public void processElement(
@Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
outputReceiver) {
record.getMods().stream()
.map(
mod ->
new DataChangeRecord(
record.getPartitionToken(),
record.getCommitTimestamp(),
record.getServerTransactionId(),
record.isLastRecordInTransactionInPartition(),
record.getRecordSequence(),
record.getTableName(),
record.getRowType(),
Collections.singletonList(mod),
record.getModType(),
record.getValueCaptureType(),
record.getNumberOfRecordsInTransaction(),
record.getNumberOfPartitionsInTransaction(),
record.getTransactionTag(),
record.isSystemTransaction(),
record.getMetadata()))
.forEach(outputReceiver::output);
}
}
KeyByIdFn
Questa funzione riceve un DataChangeRecord
e restituisce un DataChangeRecord
basato sulla chiave primaria Spanner sottoposta ad hashing in un valore intero.
private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
// NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
// Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
// of states and timers for performance purposes.
// Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
// On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
private static final int NUMBER_OF_BUCKETS = 1000;
@ProcessElement
public void processElement(
@Element DataChangeRecord record,
OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
// Hash the received keys into a bucket in order to have a
// deterministic number of buffers and timers.
String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);
outputReceiver.output(KV.of(bucketIndex, record));
}
}
BufferKeyUntilOutputTimestamp
I timer e i buffer sono per chiave. Questa funzione mette in buffer ogni record di modifica dei dati finché il watermark non supera il timestamp in cui vogliamo generare l'output dei record di modifica dei dati in buffer.
Questo codice utilizza un timer in loop per determinare quando svuotare il buffer:
- Quando rileva un record di modifica dei dati per una chiave per la prima volta, imposta il timer in modo che venga attivato al timestamp del commit del record di modifica dei dati +
incrementIntervalSeconds
(un'opzione configurabile dall'utente). - Quando il timer si attiva, aggiunge a
recordsToOutput
tutti i record di modifica dei dati nel buffer con timestamp inferiore alla data e ora di scadenza del timer. Se il buffer contiene record di modifica dei dati il cui timestamp è maggiore o uguale alla data e ora di scadenza del timer, aggiunge questi record di modifica dei dati al buffer anziché stamparli. Imposta quindi il timer successivo sull'ora di scadenza del timer corrente piùincrementIntervalInSeconds
. - Se
recordsToOutput
non è vuoto, la funzione ordina i record di modifica dei dati inrecordsToOutput
in base al timestamp del commit e all'ID transazione, quindi li stampa.
private static class BufferKeyUntilOutputTimestamp extends
DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>> {
private static final Logger LOG =
LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);
private final long incrementIntervalInSeconds = 2;
private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
this.incrementIntervalInSeconds = incrementIntervalInSeconds;
}
@SuppressWarnings("unused")
@TimerId("timer")
private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId("buffer")
private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();
@StateId("keyString")
private final StateSpec<ValueState<String>> keyString =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
public void process(
@Element KV<String, DataChangeRecord> element,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@TimerId("timer") Timer timer,
@StateId("keyString") ValueState<String> keyString) {
buffer.add(element.getValue());
// Only set the timer if this is the first time we are receiving a data change
// record with this key.
String elementKey = keyString.read();
if (elementKey == null) {
Instant commitTimestamp =
new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
Instant outputTimestamp =
commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
timer.set(outputTimestamp);
keyString.write(element.getKey());
}
}
@OnTimer("timer")
public void onExpiry(
OnTimerContext context,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@TimerId("timer") Timer timer,
@StateId("keyString") ValueState<String> keyString) {
if (!buffer.isEmpty().read()) {
String elementKey = keyString.read();
final List<DataChangeRecord> records =
StreamSupport.stream(buffer.read().spliterator(), false)
.collect(Collectors.toList());
buffer.clear();
List<DataChangeRecord> recordsToOutput = new ArrayList<>();
for (DataChangeRecord record : records) {
Instant recordCommitTimestamp =
new Instant(record.getCommitTimestamp().toSqlTimestamp());
final String recordString =
record.getMods().get(0).getNewValuesJson().isEmpty()
? "Deleted record"
: record.getMods().get(0).getNewValuesJson();
// When the watermark passes time T, this means that all records with
// event time < T have been processed and successfully committed. Since the
// timer fires when the watermark passes the expiration time, we should
// only output records with event time < expiration time.
if (recordCommitTimestamp.isBefore(context.timestamp())) {
LOG.info(
"Outputting record with key {} and value {} at expiration " +
"timestamp {}",
elementKey,
recordString,
context.timestamp().toString());
recordsToOutput.add(record);
} else {
LOG.info(
"Expired at {} but adding record with key {} and value {} back to " +
"buffer due to commit timestamp {}",
context.timestamp().toString(),
elementKey,
recordString,
recordCommitTimestamp.toString());
buffer.add(record);
}
}
// Output records, if there are any to output.
if (!recordsToOutput.isEmpty()) {
// Order the records in place, and output them. The user would need
// to implement DataChangeRecordComparator class that sorts the
// data change records by commit timestamp and transaction ID.
Collections.sort(recordsToOutput, new DataChangeRecordComparator());
context.outputWithTimestamp(
KV.of(elementKey, recordsToOutput), context.timestamp());
LOG.info(
"Expired at {}, outputting records for key {}",
context.timestamp().toString(),
elementKey);
} else {
LOG.info("Expired at {} with no records", context.timestamp().toString());
}
}
Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
LOG.info("Setting next timer to {}", nextTimer.toString());
timer.set(nextTimer);
} else {
LOG.info(
"Timer not being set since the buffer is empty: ");
keyString.clear();
}
}
}
Ordinamento delle transazioni
Questa pipeline può essere modificata in modo da ordinare i dati in base all'ID transazione e al timestamp del commit. A tale scopo, memorizza in un buffer i record per ogni coppia di ID transazione / timestamp commit anziché per ogni chiave Spanner. Ciò richiede la modifica del codice in KeyByIdFn.
Esempio: assemblare le transazioni
Questo esempio di codice legge i record di variazione dei dati, assembla tutti i record di variazione dei dati appartenenti alla stessa transazione in un unico elemento e lo restituisce. Tieni presente che le transazioni generate da questo codice campione non sono ordinate in base al timestamp del commit.
Questo esempio di codice utilizza i buffer per assemblare le transazioni dai record di modifica dei dati. Quando riceve per la prima volta un record di variazione dei dati appartenente a una transazione, legge il campo numberOfRecordsInTransaction
nel record di variazione dei dati, che descrive il numero previsto di record di variazione dei dati appartenenti a quella transazione. Memorizza in un buffer i record di variazione dei dati appartenenti a quella transazione finché il numero di record in buffer non corrisponde a numberOfRecordsInTransaction
, dopodiché emette i record di variazione dei dati raggruppati.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new KeyByTransactionIdFn()))
.apply(ParDo.of(new TransactionBoundaryFn()))
// Subsequent processing goes here
KeyByTransactionIdFn
Questa funzione riceve un DataChangeRecord
e restituisce un DataChangeRecord
basato sull'ID transazione.
private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
@ProcessElement
public void processElement(
@Element DataChangeRecord record,
OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
outputReceiver.output(KV.of(record.getServerTransactionId(), record));
}
}
TransactionBoundaryFn
TransactionBoundaryFn
memorizza le coppie chiave-valore ricevute di
{TransactionId, DataChangeRecord}
da KeyByTransactionIdFn
e
le memorizza in gruppi in base a TransactionId
. Quando il numero di record memorizzati nella memoria intermedia è uguale al numero di record contenuti nell'intera transazione, questa funzione ordina gli oggetti DataChangeRecord
nel gruppo in base alla sequenza dei record e restituisce una coppia chiave/valore di {CommitTimestamp, TransactionId}
, Iterable<DataChangeRecord>
.
Qui assumiamo che SortKey
sia una classe definita dall'utente che rappresenta una coppia {CommitTimestamp, TransactionId}
. Per ulteriori informazioni su
SortKey
, consulta l'implementazione di esempio.
private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>> {
@StateId("buffer")
private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();
@StateId("count")
private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
@ProcessElement
public void process(
ProcessContext context,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@StateId("count") ValueState<Integer> countState) {
final KV<String, DataChangeRecord> element = context.element();
final DataChangeRecord record = element.getValue();
buffer.add(record);
int count = (countState.read() != null ? countState.read() : 0);
count = count + 1;
countState.write(count);
if (count == record.getNumberOfRecordsInTransaction()) {
final List<DataChangeRecord> sortedRecords =
StreamSupport.stream(buffer.read().spliterator(), false)
.sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
.collect(Collectors.toList());
final Instant commitInstant =
new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
.getTime());
context.outputWithTimestamp(
KV.of(
new SortKey(sortedRecords.get(0).getCommitTimestamp(),
sortedRecords.get(0).getServerTransactionId()),
sortedRecords),
commitInstant);
buffer.clear();
countState.clear();
}
}
}
Esempio: filtrare per tag transazione
Quando una transazione che modifica i dati utente viene taggata, il tag corrispondente e il relativo tipo vengono memorizzati all'interno di DataChangeRecord
. Questi esempi mostrano come filtrare i record del flusso di modifiche in base ai tag transazioni definiti dall'utente e ai tag di sistema:
Filtro dei tag definiti dall'utente per my-tx-tag
:
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(Filter.by(record ->
!record.isSystemTransaction()
&& record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
// Subsequent processing goes here
Controllo del filtro dei tag di sistema/TTL:
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(Filter.by(record ->
record.isSystemTransaction()
&& record.getTransactionTag().equals("RowDeletionPolicy")))
// Subsequent processing goes here
Esempio: recupera riga completa
Questo esempio funziona con una tabella Spanner denominata Singer
con la seguente definizione:
CREATE TABLE Singers (
SingerId INT64 NOT NULL,
FirstName STRING(1024),
LastName STRING(1024)
) PRIMARY KEY (SingerId);
Nella modalità di acquisizione dei valori OLD_AND_NEW_VALUES
predefinita degli modifiche in tempo reale,
quando viene eseguito un aggiornamento di una riga Spanner, il record di variazione dei dati ricevuto conterrà solo le colonne modificate. Le colonne monitorate, ma non modificate, non verranno incluse nel record. La chiave primaria del mod può essere utilizzata per eseguire una lettura dello snapshot di Spanner al timestamp del commit del record di modifica dei dati per recuperare le colonne invariate o persino recuperare la riga completa.
Tieni presente che il criterio di conservazione del database potrebbe dover essere modificato in modo da impostare un valore maggiore o uguale al criterio di conservazione del flusso di variazioni affinché la lettura dello snapshot vada a buon fine.
Tieni inoltre presente che l'utilizzo del tipo di acquisizione del valore NEW_ROW
è il modo consigliato e più efficiente per farlo, poiché restituisce tutte le colonne monitorate della riga per impostazione predefinita e non richiede la lettura di uno snapshot aggiuntivo in Spanner.
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
// Assume we have a change stream "my-change-stream" that watches Singers table.
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
// Subsequent processing goes here
ToFullRowJsonFn
Questa trasformazione eseguirà una lettura non aggiornata al timestamp del commit di ogni record ricevuto e mapperà la riga completa in JSON.
public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
// Since each instance of this DoFn will create its own session pool and will
// perform calls to Spanner sequentially, we keep the number of sessions in
// the pool small. This way, we avoid wasting resources.
private static final int MIN_SESSIONS = 1;
private static final int MAX_SESSIONS = 5;
private final String projectId;
private final String instanceId;
private final String databaseId;
private transient DatabaseClient client;
private transient Spanner spanner;
public ToFullRowJsonFn(SpannerConfig spannerConfig) {
this.projectId = spannerConfig.getProjectId().get();
this.instanceId = spannerConfig.getInstanceId().get();
this.databaseId = spannerConfig.getDatabaseId().get();
}
@Setup
public void setup() {
SessionPoolOptions sessionPoolOptions = SessionPoolOptions
.newBuilder()
.setMinSessions(MIN_SESSIONS)
.setMaxSessions(MAX_SESSIONS)
.build();
SpannerOptions options = SpannerOptions
.newBuilder()
.setProjectId(projectId)
.setSessionPoolOption(sessionPoolOptions)
.build();
DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
spanner = options.getService();
client = spanner.getDatabaseClient(id);
}
@Teardown
public void teardown() {
spanner.close();
}
@ProcessElement
public void process(
@Element DataChangeRecord element,
OutputReceiver<String> output) {
com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
element.getMods().forEach(mod -> {
JSONObject keysJson = new JSONObject(mod.getKeysJson());
JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
ModType modType = element.getModType();
JSONObject jsonRow = new JSONObject();
long singerId = keysJson.getLong("SingerId");
jsonRow.put("SingerId", singerId);
if (modType == ModType.INSERT) {
// For INSERT mod, get non-primary key columns from mod.
jsonRow.put("FirstName", newValuesJson.get("FirstName"));
jsonRow.put("LastName", newValuesJson.get("LastName"));
} else if (modType == ModType.UPDATE) {
// For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
try (ResultSet resultSet = client
.singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
.read(
"Singers",
KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
Arrays.asList("FirstName", "LastName"))) {
if (resultSet.next()) {
jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
JSONObject.NULL : resultSet.getString("FirstName"));
jsonRow.put("LastName", resultSet.isNull("LastName") ?
JSONObject.NULL : resultSet.getString("LastName"));
}
}
} else {
// For DELETE mod, there is nothing to do, as we already set SingerId.
}
output.output(jsonRow.toString());
});
}
}
Questo codice crea un client di database Spanner per eseguire il recupero completo delle righe e configura il pool di sessioni in modo da avere solo alcune sessioni, eseguendo le letture in sequenza in un'istanza di ToFullReowJsonFn
.
Dataflow si assicura di generare molte istanze di questa funzione, ciascuna con il proprio pool di client.
Esempio: da Spanner a Pub/Sub
In questo scenario, il chiamante trasmette in streaming i record a Pub/Sub il più rapidamente possibile, senza raggruppamenti o aggregazioni. Questa è una buona soluzione per attivare l'elaborazione downstream, nonché per eseguire lo streaming di tutte le nuove righe inserite in una tabella Spanner in Pub/Sub per ulteriore elaborazione.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
.apply(PubsubIO.writeStrings().to("my-topic"));
Tieni presente che l'emissario Pub/Sub può essere configurato per garantire la semantica exactly-once.
Esempio: da Spanner a Cloud Storage
In questo scenario, il chiamante raggruppa tutti i record all'interno di una determinata finestra e salva il gruppo in file Cloud Storage separati. Questa soluzione è ideale per l'analisi e l'archiviazione point-in-time, indipendentemente dal periodo di conservazione di Spanner.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(TextIO
.write()
.to("gs://my-bucket/change-stream-results-")
.withSuffix(".txt")
.withWindowedWrites()
.withNumShards(1));
Tieni presente che l'emissario Cloud Storage fornisce la semantica almeno una volta per impostazione predefinita. Con un'elaborazione aggiuntiva, può essere modificato in modo da avere una semantica exactly-once.
Forniamo anche un modello Dataflow per questo caso d'uso: consulta Collegare modifiche in tempo reale a Cloud Storage.
Esempio: da Spanner a BigQuery (tabella di bilancio)
Qui, l'utente chiamante esegue lo streaming dei record di modifica in BigQuery. Ogni record di modifica dei dati viene visualizzato come una riga in BigQuery. Questa è una buona soluzione per l'analisi. Questo codice utilizza le funzioni definite in precedenza, nella sezione Recupero riga completa, per recuperare la riga completa del record e scriverla in BigQuery.
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
.apply(BigQueryIO
.<String>write()
.to("my-bigquery-table")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
.withSchema(new TableSchema().setFields(Arrays.asList(
new TableFieldSchema()
.setName("SingerId")
.setType("INT64")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("FirstName")
.setType("STRING")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("LastName")
.setType("STRING")
.setMode("REQUIRED")
)))
.withAutoSharding()
.optimizedWrites()
.withFormatFunction((String element) -> {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = null;
try {
jsonNode = objectMapper.readTree(element);
} catch (IOException e) {
e.printStackTrace();
}
return new TableRow()
.set("SingerId", jsonNode.get("SingerId").asInt())
.set("FirstName", jsonNode.get("FirstName").asText())
.set("LastName", jsonNode.get("LastName").asText());
}
)
);
Tieni presente che l'eseguibile di destinazione BigQuery fornisce la semantica almeno una volta per impostazione predefinita. Con un'elaborazione aggiuntiva, può essere modificato in modo da avere una semantica exactly-once.
Forniamo anche un modello Dataflow per questo caso d'uso. Consulta Collegare modifiche in tempo reale a BigQuery.
Monitorare una pipeline
Esistono due classi di metriche disponibili per monitorare una pipeline Dataflow del flusso di modifiche.
Metriche Dataflow standard
Dataflow fornisce diverse metriche per assicurarti che il tuo job sia in esecuzione correttamente, ad esempio l'aggiornamento dei dati, il ritardo del sistema, il throughput del job, l'utilizzo della CPU dei worker e altro ancora. Puoi trovare ulteriori informazioni in Utilizzare il monitoraggio per le pipeline Dataflow.
Per le pipeline di modifiche in tempo reale, è necessario tenere conto di due metriche principali: la latenza del sistema e l'aggiornamento dei dati.
La latenza del sistema indica la durata massima attuale (in secondi) per la quale un elemento di dati viene elaborato o è in attesa di elaborazione.
L'aggiornamento dei dati mostra il periodo di tempo che intercorre tra il momento attuale (in tempo reale) e il watermark di output. La filigrana di output dell'ora T
indica che tutti gli elementi con un'ora evento (rigorosamente) precedente a T
sono stati elaborati per il calcolo. In altre parole, l'aggiornamento dei dati misura l'aggiornamento della pipeline in relazione all'elaborazione degli eventi ricevuti.
Se la pipeline non dispone di risorse sufficienti, puoi vedere questo effetto in queste due metriche. La latenza del sistema aumenterà, perché gli elementi devono attendere più a lungo prima di essere elaborati. Anche l'aggiornamento dei dati aumenterà, perché la pipeline non sarà in grado di stare al passo con la quantità di dati ricevuti.
Metriche personalizzate per i flussi di modifiche
Queste metriche sono esposte in Cloud Monitoring e includono:
- Latenza bucketizzata (istogramma) tra l'commit di un record in Spanner e l'emissione in una PCollection da parte del connettore. Questa metrica può essere utilizzata per rilevare eventuali problemi di prestazioni (latenza) della pipeline.
- Numero totale di record di dati letti. Questa è un'indicazione generale del numero di record emessi dal connettore. Questo numero dovrebbe essere in costante aumento, rispecchiando la tendenza delle scritture nel database Spanner sottostante.
- Numero di partizioni in fase di lettura. Dovrebbero sempre essere lette partizioni. Se questo numero è pari a zero, indica che si è verificato un errore nella pipeline.
- Numero totale di query emesse durante l'esecuzione del connettore. Si tratta di un'indicazione generale delle query sui flussi di modifiche effettuate nell'istanza Spanner durante l'esecuzione della pipeline. Questo può essere utilizzato per ottenere una stima del carico dal connettore al database Spanner.
Aggiornare una pipeline esistente
È possibile aggiornare una pipeline in esecuzione che utilizza il connettore SpannerIO per elaborare modifiche in tempo reale se i controlli di compatibilità dei job superano. Per farlo, devi impostare esplicitamente il parametro del nome della tabella dei metadati del nuovo job durante l'aggiornamento. Utilizza il valore dell'opzione della pipeline metadataTable
del job che stai aggiornando.
Se utilizzi un modello Dataflow fornito da Google, imposta il nome della tabella utilizzando il parametro spannerMetadataTableName
. Puoi anche modificare il job esistente in modo da utilizzare esplicitamente la tabella dei metadati con il metodo withMetadataTable(your-metadata-table-name)
nella configurazione del connettore. Al termine, puoi seguire le istruzioni riportate in Avvio del job di sostituzione della documentazione di Dataflow per aggiornare un job in esecuzione.
Best practice per modifiche in tempo reale e Dataflow
Di seguito sono riportate alcune best practice per creare connessioni ai modifiche in tempo reale utilizzando Dataflow.
Utilizza un database dei metadati separato
Ti consigliamo di creare un database separato da utilizzare per lo spazio di archiviazione dei metadati del connettore SpannerIO anziché configurarlo per utilizzare il database dell'applicazione.
Per ulteriori informazioni, consulta Valutare la possibilità di utilizzare un database dei metadati separato.
Dimensiona il cluster
Una regola empirica per il numero iniziale di worker in un job di modifiche in tempo reale Spanner è un worker per 1000 scritture al secondo. Tieni presente che questa stima può variare in base a diversi fattori, ad esempio le dimensioni di ogni transazione, il numero di record dello stream di modifiche prodotti da una singola transazione e altre trasformazioni, aggregazioni o destinazioni utilizzate nella pipeline.
Dopo l'assegnazione iniziale delle risorse, è importante tenere traccia delle metriche indicate in Monitorare una pipeline per assicurarsi che la pipeline sia in buone condizioni. Ti consigliamo di fare esperimenti con un pool di worker iniziale e di monitorare il modo in cui la pipeline gestisce il carico, aumentando il numero di nodi se necessario. L'utilizzo della CPU è una metrica chiave per verificare se il carico è corretto e se sono necessari più nodi.
Limitazioni note
Esistono alcune limitazioni note relative all'utilizzo delle modifiche in tempo reale di Spanner con Dataflow:
Scalabilità automatica
Il supporto della scalabilità automatica per le pipeline che includono SpannerIO.readChangeStream
richiede Apache Beam 2.39.0
o versioni successive.
Se utilizzi una versione di Apache Beam precedente a 2.39.0
, le pipeline che includono
SpannerIO.readChangeStream
devono specificare esplicitamente l'algoritmo di scalabilità automatica come NONE
, come descritto in Scalabilità automatica orizzontale.
Per eseguire la scalabilità manuale di una pipeline Dataflow anziché utilizzare la scalabilità automatica, consulta Eseguire la scalabilità manuale di una pipeline in streaming.
Runner V2
Il connettore per modifiche in tempo reale di Spanner richiede Dataflow Runner v2.
Questo valore deve essere specificato manualmente durante l'esecuzione, altrimenti verrà generato un errore. Puoi specificare Runner V2
configurando il job con
--experiments=use_unified_worker,use_runner_v2
.
Snapshot
Il connettore dei modifiche in tempo reale di Spanner non supporta gli snapshot di Dataflow.
Svuotamento in corso
Il connettore modifiche in tempo reale Spanner non supporta lo scarico di un job. È possibile annullare solo un job esistente.
Puoi anche aggiornare una pipeline esistente senza doverla interrompere.
OpenCensus
Per utilizzare OpenCensus per monitorare la pipeline, specifica la versione 0.28.3 o successiva.
NullPointerException
all'avvio della pipeline
Un bug nella versione 2.38.0
di Apache Beam può causare un NullPointerException
all'avvio della pipeline in determinate condizioni. Ciò impedirà l'avvio del job e verrà visualizzato il seguente messaggio di errore:
java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null
Per risolvere il problema, utilizza Apache Beam versione 2.39.0
o successive oppure
specifica manualmente la versione di beam-sdks-java-core
come 2.37.0
:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.37.0</version>
</dependency>