Modifiche in tempo reale di Spanner al modello di BigQuery

Il modello di modifica in tempo reale di Spanner per i flussi di dati BigQuery è una pipeline di inserimento flussi che trasmette i flussi dei record delle modifiche dei dati di Spanner e li scrive nelle tabelle BigQuery utilizzando Dataflow Runner V2.

Tutte le colonne controllate delle modifiche in tempo reale sono incluse in ogni riga della tabella BigQuery, indipendentemente dal fatto che siano state modificate da una transazione Spanner. Le colonne non osservate non sono incluse nella riga di BigQuery. Eventuali modifiche a Spanner inferiori alla filigrana di Dataflow vengono applicate correttamente alle tabelle BigQuery o vengono archiviate nella coda dei messaggi non recapitabili per riprovare. Le righe BigQuery vengono inserite non in ordine rispetto all'ordinamento originale del timestamp di commit di Spanner.

Se le tabelle BigQuery necessarie non esistono, la pipeline le crea. In caso contrario, vengono utilizzate le tabelle BigQuery esistenti. Lo schema delle tabelle BigQuery esistenti deve contenere le colonne monitorate corrispondenti delle tabelle Spanner ed eventuali colonne di metadati aggiuntive che non vengono ignorate esplicitamente dall'opzione ignoreFields. Consulta la descrizione dei campi dei metadati nell'elenco seguente. Ogni nuova riga BigQuery include tutte le colonne controllate dal flusso di modifiche dalla riga corrispondente nella tabella Spanner al timestamp del record delle modifiche.

I seguenti campi di metadati vengono aggiunti alle tabelle BigQuery. Per ulteriori dettagli su questi campi, consulta Record delle modifiche dei dati in "Modificare partizioni, record e query di flussi di dati".

Quando utilizzi questo modello, tieni presente i seguenti dettagli:

  • Questo modello non propaga le modifiche allo schema da Spanner a BigQuery. Poiché è probabile che l'esecuzione di una modifica dello schema in Spanner danneggi la pipeline, potresti dover ricreare la pipeline dopo la modifica dello schema.
  • Per i tipi di acquisizione dei valori OLD_AND_NEW_VALUES e NEW_VALUES, quando il record delle modifiche ai dati contiene una modifica di tipo UPDATE, il modello deve eseguire una lettura inattiva in Spanner in corrispondenza del timestamp di commit del record delle modifiche dati per recuperare le colonne non modificate ma controllate. Assicurati di configurare correttamente il database "version_retention_period" per la lettura inattiva. Per il tipo di acquisizione valori NEW_ROW, il modello è più efficiente perché il record delle modifiche ai dati acquisisce l'intera nuova riga, comprese le colonne che non vengono aggiornate nelle richieste UPDATE, e il modello non ha bisogno di eseguire una lettura inattiva.
  • Per ridurre al minimo la latenza di rete e i costi di trasporto della rete, esegui il job Dataflow dalla stessa regione della tua istanza Spanner o delle tabelle BigQuery. Se utilizzi origini, sink, posizioni dei file temporanei o posizioni dei file temporanei che si trovano al di fuori della regione del job, i tuoi dati potrebbero essere inviati tra regioni. Per ulteriori informazioni, consulta Regioni Dataflow.
  • Questo modello supporta tutti i tipi di dati Spanner validi. Se il tipo di BigQuery è più preciso rispetto al tipo di Spanner, durante la trasformazione potrebbe verificarsi una perdita di precisione. In particolare:
    • Per il tipo JSON di Spanner, l'ordine dei membri di un oggetto viene ordinato in modo lessicografico, ma non esiste una garanzia di questo tipo per il tipo JSON BigQuery.
    • Spanner supporta il tipo TIMESTAMP in nanosecondi, ma BigQuery supporta solo il tipo TIMESTAMP in microsecondi.
  • Questo modello non supporta l'utilizzo dell' API BigQuery StorageWrite in modalità "exactly-once".

Scopri di più sui flussi di modifiche, su come creare pipeline Dataflow in modalità flusso di modifiche e sulle best practice.

Requisiti della pipeline

  • L'istanza Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il database Spanner deve esistere prima dell'esecuzione della pipeline.
  • L'istanza dei metadati Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il database di metadati Spanner deve esistere prima dell'esecuzione della pipeline.
  • Le modifiche in tempo reale di Spanner devono esistere prima dell'esecuzione della pipeline.
  • Il set di dati BigQuery deve esistere prima dell'esecuzione della pipeline.

Parametri del modello

Parametri obbligatori

  • spannerInstanceId : l'istanza Spanner da cui leggere le modifiche in tempo reale.
  • spannerDatabase : il database Spanner da cui leggere modifiche in tempo reale.
  • spannerMetadataInstanceId : l'istanza Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
  • spannerMetadataDatabase : il database Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
  • spannerChangeStreamName : il nome del flusso di modifiche Spanner da cui leggere.
  • bigQueryDataset : il set di dati BigQuery per le modifiche in tempo reale di output.

Parametri facoltativi

  • spannerProjectId : il progetto da cui leggere modifiche in tempo reale. Questo valore è anche il progetto in cui viene creata la tabella dei metadati del connettore di modifiche in tempo reale. Il valore predefinito di questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
  • spannerDatabaseRole : il ruolo del database Spanner da utilizzare durante l'esecuzione del modello. Questo parametro è obbligatorio solo se l'entità IAM che esegue il modello è un utente con controllo dell'accesso dell'accesso granulare. Il ruolo del database deve avere il privilegio SELECT nel flusso di modifiche e il privilegio EXECUTE per la funzione di lettura del flusso di modifiche. Per ulteriori informazioni, consulta Controllo dell'accesso granulare per le modifiche in tempo reale (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName : la modifiche in tempo reale del nome della tabella dei metadati del connettore di Spanner da utilizzare. Se non viene specificata, durante il flusso della pipeline viene creata automaticamente una tabella dei metadati del connettore di modifiche in tempo reale in modalità Spanner. Devi fornire questo parametro quando aggiorni una pipeline esistente. In caso contrario, non fornire questo parametro.
  • rpcPriority : la priorità delle richieste per le chiamate Spanner. Il valore deve essere uno dei seguenti: HIGH, MEDIUM o LOW. Il valore predefinito è HIGH.
  • spannerHost : l'endpoint di Cloud Spanner da chiamare nel modello. Utilizzato solo per i test. Esempio: https://batch-spanner.googleapis.com.
  • startTimestamp : il valore DateTime iniziale (https://datatracker.ietf.org/doc/html/rfc3339), incluso, da utilizzare per la lettura delle modifiche in tempo reale. Ex-2021-10-12T07:20:50.52Z. Il valore predefinito è il timestamp di avvio della pipeline, ovvero l'ora attuale.
  • endTimestamp : il DateTime finale (https://datatracker.ietf.org/doc/html/rfc3339), incluso, da utilizzare per la lettura delle modifiche in tempo reale.Esempio: 2021-10-12T07:20:50.52Z. Il valore predefinito è un tempo infinito nel futuro.
  • bigQueryProjectId : il progetto BigQuery. Il valore predefinito è il progetto per il job Dataflow.
  • bigQueryChangelogTableNameTemplate : il modello per il nome della tabella BigQuery contenente il log delle modifiche. Il valore predefinito è: {_metadata_spanner_table_name}_changelog.
  • deadLetterQueueDirectory : il percorso per archiviare tutti i record non elaborati. Il percorso predefinito è una directory all'interno della località temporanea del job Dataflow. In genere è sufficiente il valore predefinito.
  • dlqRetryMinutes : il numero di minuti tra i nuovi tentativi in una coda di messaggi non recapitabili. Il valore predefinito è 10.
  • ignoreFields : un elenco di campi separati da virgole (sensibili alle maiuscole) da ignorare. Potrebbero essere campi di tabelle controllate o campi di metadati aggiunti dalla pipeline. I campi ignorati non vengono inseriti in BigQuery. Quando ignori il campo _metadata_spanner_table_name, viene ignorato anche il parametro bigQueryChangelogTableNameTemplate. Il campo predefinito è vuoto.
  • disableDlqRetries : indica se disabilitare o meno i nuovi tentativi per la DLQ. Il valore predefinito è false.
  • useStorageWriteApi : se true, la pipeline utilizza l'API BigQuery Storage Scrivi (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è false. Per ulteriori informazioni, consulta Utilizzo dell'API StorageWrite (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce : quando si utilizza l'API StorageWrite, specifica la semantica della scrittura. Per utilizzare la semantica "at-least-once-semantics" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), imposta questo parametro su true. Per utilizzare la semantica "exactly-once", imposta il parametro su false. Questo parametro si applica solo quando useStorageWriteApi è true. Il valore predefinito è false.
  • numStorageWriteApiStreams : quando si utilizza l'API StorageWrite, specifica il numero di flussi di scrittura. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro. Il valore predefinito è 0.
  • storageWriteApiTriggeringFrequencySec : quando utilizzi l'API StorageWrite, specifica la frequenza di attivazione in secondi. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Cloud Spanner change streams to BigQuery template.
  6. Inserisci i valori parametro negli appositi campi.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Sostituisci quanto segue:

  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/
    • il nome della versione, ad esempio 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • SPANNER_INSTANCE_ID: ID istanza Spanner
  • SPANNER_DATABASE: database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID istanza di metadati di Spanner
  • SPANNER_METADATA_DATABASE: database di metadati Spanner
  • SPANNER_CHANGE_STREAM: modifiche in tempo reale di Spanner
  • BIGQUERY_DATASET: il set di dati BigQuery per l'output delle modifiche in tempo reale

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/
    • il nome della versione, ad esempio 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • SPANNER_INSTANCE_ID: ID istanza Spanner
  • SPANNER_DATABASE: database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID istanza di metadati di Spanner
  • SPANNER_METADATA_DATABASE: database di metadati Spanner
  • SPANNER_CHANGE_STREAM: modifiche in tempo reale di Spanner
  • BIGQUERY_DATASET: il set di dati BigQuery per l'output delle modifiche in tempo reale

Passaggi successivi