Il modello di modifiche in tempo reale reale di Spanner per BigQuery è una pipeline in modalità flusso che trasmette i flussi di record delle modifiche dei dati di Spanner e li scrive nelle tabelle BigQuery usando Dataflow Runner V2.
Tutte le colonne controllate delle modifiche in tempo reale sono incluse in ogni riga della tabella BigQuery, indipendentemente se vengono modificati da una transazione Spanner. Le colonne non guardate non sono incluse nella riga BigQuery. Eventuali modifiche a Spanner inferiori a Dataflow la filigrana viene applicata correttamente alle tabelle BigQuery o viene archiviata coda messaggi non recapitabili per nuovo tentativo. Le righe BigQuery vengono inserite nell'ordine sbagliato rispetto a l'ordinamento originale del timestamp di commit di Spanner.
Se le tabelle BigQuery necessarie non esistono, la pipeline le crea. Altrimenti,
vengono utilizzate le tabelle BigQuery esistenti. Lo schema delle tabelle BigQuery esistenti deve
contengono le colonne monitorate corrispondenti delle tabelle Spanner e qualsiasi altro
colonne di metadati che non vengono ignorate esplicitamente dall'opzione ignoreFields
.
Consulta la descrizione dei campi dei metadati nell'elenco seguente.
Ogni nuova riga BigQuery include tutte le colonne
controllato dal flusso di modifiche dalla riga corrispondente nella tabella Spanner al
il timestamp del record delle modifiche.
I seguenti campi di metadati vengono aggiunti alle tabelle BigQuery. Per ulteriori dettagli su questi campi, consulta Record delle modifiche dei dati in "Modificare partizioni, record e query di flussi di dati".
_metadata_spanner_mod_type
: il tipo di modifica (inserisci, aggiornamento o eliminazione) della transazione Spanner. Estratto dalla modifica record delle modifiche ai dati del flusso._metadata_spanner_table_name
: il nome della tabella Spanner. Questo non è il nome della tabella dei metadati del connettore._metadata_spanner_commit_timestamp
: Spanner commit timestamp, ovvero l'ora in cui viene eseguito il commit di una modifica. Questo valore viene estratto dal record delle modifiche dei dati delle modifiche in tempo reale._metadata_spanner_server_transaction_id
: una stringa univoca a livello globale che rappresenta la transazione Spanner in cui è stata eseguita la modifica. Usa solo questo nel contesto dell'elaborazione dei record delle modifiche in tempo reale. Non è è correlato all'ID transazione nell'API Spanner. Questo valore viene estratto dal record delle modifiche dei dati delle modifiche in tempo reale._metadata_spanner_record_sequence
: il numero di sequenza per il record all'interno di la transazione Spanner. I numeri di sequenza sono garantiti unici e monotonici in aumento, ma non necessariamente contiguo, all'interno di un transazione. Questo valore viene estratto dal record delle modifiche dei dati delle modifiche in tempo reale._metadata_spanner_is_last_record_in_transaction_in_partition
: indica se il record è l'ultimo record di una transazione Spanner nella partizione corrente. Questo valore viene estratto dal record delle modifiche dei dati delle modifiche in tempo reale._metadata_spanner_number_of_records_in_transaction
: il numero di dati modificare i record che fanno parte della transazione Spanner su a tutte le partizioni di modifiche in tempo reale. Questo valore viene estratto dal record delle modifiche dei dati delle modifiche in tempo reale._metadata_spanner_number_of_partitions_in_transaction
: il numero di partizioni che restituiscono i record delle modifiche dei dati per Spanner transazione. Questo valore viene estratto dal record delle modifiche dei dati delle modifiche in tempo reale._metadata_big_query_commit_timestamp
: il timestamp del commit quando la riga viene inserita in BigQuery. SeuseStorageWriteApi
ètrue
, questa colonna non è automaticamente nella tabella del log delle modifiche dalla pipeline. In questo caso, devi aggiungere manualmente questa colonna nella tabella del log delle modifiche, se necessario.
Quando utilizzi questo modello, tieni presente i seguenti dettagli:
- Questo modello non propaga le modifiche allo schema da Spanner a BigQuery. Poiché l'esecuzione di una modifica dello schema in Spanner probabilmente causerà l'interruzione della pipeline, potresti dover ricreare la pipeline dopo la modifica dello schema.
- Per i tipi di acquisizione dei valori
OLD_AND_NEW_VALUES
eNEW_VALUES
, quando il record della variazione dei dati contiene una modifica UPDATE, il modello deve eseguire una lettura inattiva su Spanner il timestamp di commit del record delle modifiche ai dati per recuperare le colonne non modificate ma controllate. Marca assicurati di configurare il database "version_retention_period" in modo corretto per la lettura inattiva. Per il tipo di acquisizione valoreNEW_ROW
, il modello è più efficiente, perché il record delle modifiche dei dati acquisisce la nuova riga completa, incluse le colonne che non sono aggiornate nelle richieste UPDATE, e il modello non non è necessario leggere. - Per ridurre al minimo la latenza di rete e i costi di trasporto della rete, esegui Dataflow nella stessa regione dell'istanza Spanner o delle tabelle BigQuery. Se utilizzare origini, sink, posizioni dei file temporanei o posizioni dei file temporanei che si trovano all'esterno della regione del job, i tuoi dati potrebbero essere inviati tra regioni diverse. Per ulteriori informazioni, vedi Regioni di Dataflow.
- Questo modello supporta tutti i tipi di dati Spanner validi. Se BigQuery
è più preciso rispetto al tipo Spanner, la perdita di precisione potrebbe verificarsi durante
e la trasformazione dei dati. Nello specifico:
- Per il tipo JSON Spanner, l'ordine dei membri di un oggetto è in ordine grammaticale, ma non esiste una garanzia di questo tipo per il tipo JSON BigQuery.
- Spanner supporta il tipo TIMESTAMP in nanosecondi, ma BigQuery supporta solo tipo TIMESTAMP da microsecondi.
- Questo modello non supporta l'utilizzo di l'API BigQuery StorageWrite in modalità "exactly-once".
Scopri di più sui flussi di modifiche, su come creare pipeline Dataflow in modalità flusso di modifiche e sulle best practice.
Requisiti della pipeline
- L'istanza Spanner deve esistere prima dell'esecuzione della pipeline.
- Il database Spanner deve esistere prima dell'esecuzione della pipeline.
- L'istanza dei metadati Spanner deve esistere prima dell'esecuzione della pipeline.
- Il database di metadati Spanner deve esistere prima dell'esecuzione della pipeline.
- Le modifiche in tempo reale di Spanner devono esistere prima dell'esecuzione della pipeline.
- Il set di dati BigQuery deve esistere prima dell'esecuzione della pipeline.
Parametri del modello
Parametri obbligatori
- spannerInstanceId : l'istanza Spanner da cui leggere le modifiche in tempo reale.
- spannerDatabase : il database Spanner da cui leggere modifiche in tempo reale.
- spannerMetadataInstanceId : l'istanza Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
- spannerMetadataDatabase : il database Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
- spannerChangeStreamName : il nome del flusso di modifiche Spanner da cui leggere.
- bigQueryDataset : il set di dati BigQuery per le modifiche in tempo reale di output.
Parametri facoltativi
- spannerProjectId : il progetto da cui leggere modifiche in tempo reale. Questo valore è anche il progetto in cui viene creata la tabella dei metadati del connettore di modifiche in tempo reale. Il valore predefinito di questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
- spannerDatabaseRole : il ruolo del database Spanner da utilizzare durante l'esecuzione del modello. Questo parametro è obbligatorio solo se l'entità IAM che esegue il modello è un utente con controllo dell'accesso granulare. Il ruolo del database deve avere il privilegio SELECT nel flusso di modifiche e il privilegio EXECUTE per la funzione di lettura del flusso di modifiche. Per ulteriori informazioni, consulta Controllo dell'accesso granulare per le modifiche in tempo reale (https://cloud.google.com/spanner/docs/fgac-change-streams).
- spannerMetadataTableName : la modifiche in tempo reale del nome della tabella dei metadati del connettore di Spanner da utilizzare. Se non viene specificata, durante il flusso della pipeline viene creata automaticamente una tabella dei metadati del connettore di modifiche in tempo reale in modalità Spanner. Devi fornire questo parametro quando aggiorni una pipeline esistente. In caso contrario, non fornire questo parametro.
- rpcPriority : la priorità delle richieste per le chiamate Spanner. Il valore deve essere uno dei seguenti:
HIGH
,MEDIUM
oLOW
. Il valore predefinito èHIGH
. - spannerHost : l'endpoint di Cloud Spanner da chiamare nel modello. Utilizzato solo per i test. Esempio: https://batch-spanner.googleapis.com.
- startTimestamp : il valore DateTime iniziale (https://datatracker.ietf.org/doc/html/rfc3339), incluso, da utilizzare per la lettura delle modifiche in tempo reale. Ex-2021-10-12T07:20:50.52Z. Il valore predefinito è il timestamp di avvio della pipeline, ovvero l'ora attuale.
- endTimestamp : il DateTime finale (https://datatracker.ietf.org/doc/html/rfc3339), incluso, da utilizzare per la lettura delle modifiche in tempo reale.Esempio: 2021-10-12T07:20:50.52Z. Il valore predefinito è un tempo infinito nel futuro.
- bigQueryProjectId : il progetto BigQuery. Il valore predefinito è il progetto per il job Dataflow.
- bigQueryChangelogTableNameTemplate : il modello per il nome della tabella BigQuery contenente il log delle modifiche. Il valore predefinito è: {_metadata_spanner_table_name}_changelog.
- deadLetterQueueDirectory : il percorso per archiviare tutti i record non elaborati. Il percorso predefinito è una directory all'interno della località temporanea del job Dataflow. In genere è sufficiente il valore predefinito.
- dlqRetryMinutes : il numero di minuti tra i nuovi tentativi in una coda di messaggi non recapitabili. Il valore predefinito è 10.
- ignoreFields : un elenco di campi separati da virgole (sensibili alle maiuscole) da ignorare. Potrebbero essere campi di tabelle controllate o campi di metadati aggiunti dalla pipeline. I campi ignorati non vengono inseriti in BigQuery. Quando ignori il campo _metadata_spanner_table_name, viene ignorato anche il parametro bigQueryChangelogTableNameTemplate. Il campo predefinito è vuoto.
- disableDlqRetries : indica se disabilitare o meno i nuovi tentativi per la DLQ. Il valore predefinito è false.
- useStorageWriteApi : se true, la pipeline utilizza l'API BigQuery Storage Scrivi (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è
false
. Per ulteriori informazioni, consulta Utilizzo dell'API StorageWrite (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - useStorageWriteApiAtLeastOnce : quando si utilizza l'API StorageWrite, specifica la semantica della scrittura. Per utilizzare la semantica "at-least-once-semantics" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), imposta questo parametro su
true
. Per utilizzare la semantica "exactly-once", imposta il parametro sufalse
. Questo parametro si applica solo quandouseStorageWriteApi
ètrue
. Il valore predefinito èfalse
. - numStorageWriteApiStreams : quando si utilizza l'API StorageWrite, specifica il numero di flussi di scrittura. Se
useStorageWriteApi
ètrue
euseStorageWriteApiAtLeastOnce
èfalse
, devi impostare questo parametro. Il valore predefinito è 0. - storageWriteApiTriggeringFrequencySec : quando utilizzi l'API StorageWrite, specifica la frequenza di attivazione in secondi. Se
useStorageWriteApi
ètrue
euseStorageWriteApiAtLeastOnce
èfalse
, devi impostare questo parametro.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. Il valore predefinito
è
us-central1
.Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.
- Dal menu a discesa Modello Dataflow, seleziona the Cloud Spanner change streams to BigQuery template.
- Inserisci i valori parametro negli appositi campi.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ bigQueryDataset=BIGQUERY_DATASET
Sostituisci quanto segue:
JOB_NAME
: un nome job univoco di tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile in cartella principale non-dated nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica , che puoi trovare nidificata nella rispettiva cartella principale con data del bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempious-central1
SPANNER_INSTANCE_ID
: ID istanza SpannerSPANNER_DATABASE
: database SpannerSPANNER_METADATA_INSTANCE_ID
: ID istanza di metadati di SpannerSPANNER_METADATA_DATABASE
: database di metadati SpannerSPANNER_CHANGE_STREAM
: modifiche in tempo reale di SpannerBIGQUERY_DATASET
: il set di dati BigQuery per l'output delle modifiche in tempo reale
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sul
API e i relativi ambiti di autorizzazione, consulta
projects.templates.launch
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "bigQueryDataset": "BIGQUERY_DATASET" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery", } }
Sostituisci quanto segue:
PROJECT_ID
: L'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome job univoco di tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile in cartella principale non-dated nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica , che puoi trovare nidificata nella rispettiva cartella principale con data del bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempious-central1
SPANNER_INSTANCE_ID
: ID istanza SpannerSPANNER_DATABASE
: database SpannerSPANNER_METADATA_INSTANCE_ID
: ID istanza di metadati di SpannerSPANNER_METADATA_DATABASE
: database di metadati SpannerSPANNER_CHANGE_STREAM
: modifiche in tempo reale di SpannerBIGQUERY_DATASET
: il set di dati BigQuery per l'output delle modifiche in tempo reale
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.