Il modello Modifiche in tempo reale di Spanner a BigQuery è una pipeline di flussi di dati che trasmette i record delle modifiche dei dati di Spanner e li scrive nelle tabelle BigQuery utilizzando Dataflow Runner 2.
Tutte le colonne monitorate del flusso di modifiche sono incluse in ogni riga della tabella BigQuery, indipendentemente dal fatto che siano modificate da una transazione Spanner. Le colonne non guardate non sono incluse nella riga BigQuery. Eventuali modifiche di Spanner precedenti alla marcatura temporale di Dataflow vengono applicate correttamente alle tabelle BigQuery o archiviate nella coda delle email in arrivo non recapitate per il nuovo tentativo. Le righe BigQuery vengono inserite fuori ordine rispetto all'ordine del timestamp del commit Spanner originale.
Se le tabelle BigQuery necessarie non esistono, la pipeline le crea. In caso contrario, vengono utilizzate le tabelle BigQuery esistenti. Lo schema delle tabelle BigQuery esistenti deve contenere le colonne monitorate corrispondenti delle tabelle Spanner e eventuali colonne aggiuntive per i metadati che non vengono ignorate esplicitamente dall'opzione ignoreFields
.
Consulta la descrizione dei campi dei metadati nell'elenco seguente.
Ogni nuova riga BigQuery include tutte le colonne monitorate dallo stream di modifiche dalla riga corrispondente nella tabella Spanner al timestamp del record di modifica.
I seguenti campi di metadati vengono aggiunti alle tabelle BigQuery. Per maggiori dettagli su questi campi, consulta Record di variazione dei dati in "Partizioni, record e query degli stream di variazioni".
_metadata_spanner_mod_type
: il tipo di modifica (inserimento, aggiornamento o eliminazione) della transazione Spanner. Estratto dal record di modifica dei dati del flusso di modifiche._metadata_spanner_table_name
: il nome della tabella Spanner. Questo campo non è il nome della tabella dei metadati del connettore._metadata_spanner_commit_timestamp
: il timestamp del commit di Spanner, ovvero il momento in cui viene eseguito il commit di una modifica. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche._metadata_spanner_server_transaction_id
: una stringa univoca a livello globale che rappresenta la transazione Spanner in cui è stata eseguita la commit della modifica. Utilizza questo valore solo nel contesto dell'elaborazione dei record dello stream di modifiche. Non è correlato all'ID transazione nell'API di Spanner. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche._metadata_spanner_record_sequence
: il numero di sequenza del record all'interno della transazione Spanner. I numeri di sequenza sono garantiti come univoci e in aumento monotonico, ma non necessariamente contigui, all'interno di una transazione. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche._metadata_spanner_is_last_record_in_transaction_in_partition
: indica se il record è l'ultimo record per una transazione Spanner nella partizione corrente. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche._metadata_spanner_number_of_records_in_transaction
: il numero di record di variazione dei dati che fanno parte della transazione Spanner in tutte le partizioni del flusso di variazioni. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche._metadata_spanner_number_of_partitions_in_transaction
: il numero di partizioni che restituiscono record di variazione dei dati per la transazione Spanner. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche._metadata_big_query_commit_timestamp
: il timestamp del commit quando la riga viene inserita in BigQuery. SeuseStorageWriteApi
ètrue
, questa colonna non viene creata automaticamente nella tabella del log delle modifiche dalla pipeline. In questo caso, se necessario, devi aggiungere manualmente questa colonna alla tabella del log delle modifiche.
Quando utilizzi questo modello, tieni presente i seguenti dettagli:
- Puoi utilizzare questo modello per propagare nuove colonne nelle tabelle esistenti o in nuove tabelle da Spanner a BigQuery. Per ulteriori informazioni, consulta Gestire l'aggiunta di tabelle o colonne di monitoraggio.
- Per i tipi di acquisizione dei valori
OLD_AND_NEW_VALUES
eNEW_VALUES
, quando il record di modifica dei dati contiene una modifica UPDATE, il modello deve eseguire una lettura obsoleta in Spanner al timestamp del commit del record di modifica dei dati per recuperare le colonne invariate, ma monitorate. Assicurati di configurare correttamente "version_retention_period" del database per la lettura non aggiornata. Per il tipo di acquisizione del valoreNEW_ROW
, il modello è più efficiente perché il record di variazione dei dati acquisisce la nuova riga completa, incluse le colonne che non vengono aggiornate nelle richieste UPDATE, e il modello non deve eseguire una lettura obsoleta. - Per ridurre al minimo la latenza di rete e i costi di trasporto di rete, esegui il job Dataflow dalla stessa regione dell'istanza Spanner o delle tabelle BigQuery. Se utilizzi origini, destinazioni, posizioni dei file di staging o posizioni dei file temporanei situate al di fuori della regione del tuo job, i dati potrebbero essere inviati tra regioni. Per ulteriori informazioni, consulta Regioni di Dataflow.
- Questo modello supporta tutti i tipi di dati Spanner validi. Se il tipo BigQuery è più preciso del tipo Spanner, durante la trasformazione potrebbe verificarsi una perdita di precisione. In particolare:
- Per il tipo JSON di Spanner, l'ordine dei membri di un oggetto è alfabetico, ma non esiste questa garanzia per il tipo JSON di BigQuery.
- Spanner supporta il tipo TIMESTAMP in nanosecondi, ma BigQuery supporta solo il tipo TIMESTAMP in microsecondi.
- Questo modello non supporta l'utilizzo dell' API BigQuery Storage Write in modalità esattamente una volta.
Scopri di più sui flussi di modifiche, su come creare modifiche in tempo reale Dataflow con flussi di modifiche e sulle best practice.
Requisiti della pipeline
- L'istanza Spanner deve esistere prima dell'esecuzione della pipeline.
- Il database Spanner deve esistere prima dell'esecuzione della pipeline.
- L'istanza di metadati Spanner deve esistere prima dell'esecuzione della pipeline.
- Il database dei metadati Spanner deve esistere prima dell'esecuzione della pipeline.
- Il flusso di modifiche Spanner deve esistere prima dell'esecuzione della pipeline.
- Il set di dati BigQuery deve esistere prima dell'esecuzione della pipeline.
Gestire l'aggiunta di tabelle o colonne di monitoraggio
Questa sezione descrive le best practice per gestire l'aggiunta di tabelle e colonne Spanner per il monitoraggio durante l'esecuzione della pipeline. La versione del modello supportata più vecchia per questa funzionalità è 2024-09-19-00_RC00
.
- Prima di aggiungere una nuova colonna all'ambito di uno stream di modifiche Spanner,
aggiungila prima alla tabella del log delle modifiche di BigQuery. La colonna aggiunta deve avere un tipo di dati corrispondente ed essere
NULLABLE
. Attendi almeno 10 minuti prima di continuare a creare la nuova colonna o tabella in Spanner. La scrittura nella nuova colonna senza attendere potrebbe comportare un record non elaborato con un codice di errore non valido nella directory della coda di lettere morte. - Per aggiungere una nuova tabella, devi prima aggiungerla al database Spanner. La tabella viene creata automaticamente in BigQuery quando la pipeline riceve un record per la nuova tabella.
- Dopo aver aggiunto le nuove colonne o tabelle nel database Spanner, assicurati di modificare il tuo stream di modifiche per monitorare le nuove colonne o tabelle che ti interessano, se non sono già monitorate in modo implicito.
- Il modello non elimina tabelle o colonne da BigQuery. Se una colonna viene eliminata dalla tabella Spanner, i valori null vengono inseriti nelle colonne del log delle modifiche di BigQuery per i record generati dopo l'eliminazione delle colonne dalla tabella Spanner, a meno che non elimini manualmente la colonna da BigQuery.
- Il modello non supporta gli aggiornamenti del tipo di colonna. Anche se
Spanner supporta la modifica di una colonna
STRING
in una colonnaBYTES
o di una colonnaBYTES
in una colonnaSTRING
, non puoi modificare il tipo di dati di una colonna esistente o utilizzare lo stesso nome di colonna con tipi di dati diversi in BigQuery. Se elimini e ricrei una colonna con lo stesso nome, ma di tipo diverso in Spanner, i dati potrebbero essere scritti nella colonna BigQuery esistente, ma il tipo rimane invariato. - Questo modello non supporta gli aggiornamenti della modalità colonna. Le colonne dei metadati
replicate in BigQuery sono impostate sulla modalità
REQUIRED
. Tutte le altre colonne replicate in BigQuery sono impostate suNULLABLE
, indipendentemente dal fatto che siano definite comeNOT NULL
nella tabella Spanner. Non puoi aggiornare le colonneNULLABLE
alla modalitàREQUIRED
in BigQuery. - La modifica del tipo di acquisizione dei valori di uno stream di variazioni non è supportata per le pipeline in esecuzione.
Parametri del modello
Parametri obbligatori
- spannerInstanceId: l'istanza Spanner da cui leggere modifiche in tempo reale.
- spannerDatabase: il database Spanner da cui leggere modifiche in tempo reale.
- spannerMetadataInstanceId: l'istanza Spanner da utilizzare per la tabella dei metadati del connettore delle modifiche in tempo reale.
- spannerMetadataDatabase: il database Spanner da utilizzare per la tabella dei metadati del connettore degli modifiche in tempo reale.
- spannerChangeStreamName: il nome del flusso di modifiche Spanner da cui leggere.
- bigQueryDataset: il set di dati BigQuery per l'output modifiche in tempo reale.
Parametri facoltativi
- spannerProjectId: il progetto da cui leggere modifiche in tempo reale. Questo valore è anche il progetto in cui viene creata la tabella dei metadati del connettore degli modifiche in tempo reale. Il valore predefinito per questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
- spannerDatabaseRole: il ruolo del database Spanner da utilizzare durante l'esecuzione del modello. Questo parametro è obbligatorio solo se l'entità IAM che esegue il modello è un utente controllo dell'accesso dell'accesso granulare. Il ruolo del database deve disporre del privilegio
SELECT
sul flusso di modifiche e del privilegioEXECUTE
sulla funzione di lettura del flusso di modifiche. Per ulteriori informazioni, consulta Controllo dell'accesso granulare per modifiche in tempo reale (https://cloud.google.com/spanner/docs/fgac-change-streams). - spannerMetadataTableName: il nome della tabella dei metadati del connettore di modifiche in tempo reale di Spanner da utilizzare. Se non viene fornita, una tabella dei metadati del connettore degli modifiche in tempo reale Spanner viene creata automaticamente durante il flusso della pipeline. Devi fornire questo parametro quando aggiorni una pipeline esistente. In caso contrario, non fornire questo parametro.
- rpcPriority: la priorità della richiesta per le chiamate Spanner. Il valore deve essere uno dei seguenti:
HIGH
,MEDIUM
oLOW
. Il valore predefinito èHIGH
. - spannerHost: l'endpoint Cloud Spanner da chiamare nel modello. Utilizzato solo per i test. Ad esempio,
https://batch-spanner.googleapis.com
. - startTimestamp: la data e l'ora di inizio (https://datatracker.ietf.org/doc/html/rfc3339), incluse, da utilizzare per la lettura degli modifiche in tempo reale. Ex-2021-10-12T07:20:50.52Z. Il valore predefinito è il timestamp all'avvio della pipeline, ovvero l'ora corrente.
- endTimestamp: la data e l'ora di fine (https://datatracker.ietf.org/doc/html/rfc3339), incluse, da utilizzare per la lettura degli stream di variazioni. Es. 2021-10-12T07:20:50.52Z. Il valore predefinito è un'ora infinita nel futuro.
- bigQueryProjectId: il progetto BigQuery. Il valore predefinito è il progetto per il job Dataflow.
- bigQueryChangelogTableNameTemplate: il modello per il nome della tabella BigQuery contenente il log delle modifiche. Il valore predefinito è: {_metadata_spanner_table_name}_changelog.
- deadLetterQueueDirectory: il percorso per archiviare i record non elaborati. Il percorso predefinito è una directory nella posizione temporanea del job di Dataflow. In genere, il valore predefinito è sufficiente.
- dlqRetryMinutes: il numero di minuti tra le riavviate della coda messaggi non recapitabili. Il valore predefinito è
10
. - ignoreFields: un elenco separato da virgole di campi (sensibile alle maiuscole) da ignorare. Questi campi possono essere campi di tabelle monitorate o campi di metadati aggiunti dalla pipeline. I campi ignorati non vengono inseriti in BigQuery. Se ignori il campo _metadata_spanner_table_name, viene ignorato anche il parametro bigQueryChangelogTableNameTemplate. Il valore predefinito è vuoto.
- disableDlqRetries: indica se disattivare o meno i tentativi per la coda DLQ. Il valore predefinito è false.
- useStorageWriteApi: se true, la pipeline utilizza l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è
false
. Per ulteriori informazioni, consulta Utilizzo dell'API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - useStorageWriteApiAtLeastOnce: quando utilizzi l'API Storage Write, specifica la semantica di scrittura. Per utilizzare la semantica almeno una volta (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), imposta questo parametro su
true
. Per utilizzare la semantica esattamente una volta, imposta il parametro sufalse
. Questo parametro si applica solo quandouseStorageWriteApi
ètrue
. Il valore predefinito èfalse
. - numStorageWriteApiStreams: quando utilizzi l'API Storage Write, specifica il numero di stream di scrittura. Se
useStorageWriteApi
ètrue
euseStorageWriteApiAtLeastOnce
èfalse
, devi impostare questo parametro. Il valore predefinito è 0. - storageWriteApiTriggeringFrequencySec: quando utilizzi l'API Storage Write, specifica la frequenza di attivazione in secondi. Se
useStorageWriteApi
ètrue
euseStorageWriteApiAtLeastOnce
èfalse
, devi impostare questo parametro.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Nel menu a discesa Modello di flusso di dati, seleziona the Cloud Spanner change streams to BigQuery template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ bigQueryDataset=BIGQUERY_DATASET
Sostituisci quanto segue:
JOB_NAME
: un nome di job univoco a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
SPANNER_INSTANCE_ID
: ID istanza SpannerSPANNER_DATABASE
: database SpannerSPANNER_METADATA_INSTANCE_ID
: ID istanza dei metadati SpannerSPANNER_METADATA_DATABASE
: database dei metadati SpannerSPANNER_CHANGE_STREAM
: modifica in tempo reale di SpannerBIGQUERY_DATASET
: il set di dati BigQuery per l'output degli modifiche in tempo reale
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "bigQueryDataset": "BIGQUERY_DATASET" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery", } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome di job univoco a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
SPANNER_INSTANCE_ID
: ID istanza SpannerSPANNER_DATABASE
: database SpannerSPANNER_METADATA_INSTANCE_ID
: ID istanza dei metadati SpannerSPANNER_METADATA_DATABASE
: database dei metadati SpannerSPANNER_CHANGE_STREAM
: modifica in tempo reale di SpannerBIGQUERY_DATASET
: il set di dati BigQuery per l'output degli modifiche in tempo reale
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.