Modifiche in tempo reale di Spanner al modello di BigQuery

Il modello di modifiche in tempo reale reale di Spanner per BigQuery è una pipeline in modalità flusso che trasmette i flussi di record delle modifiche dei dati di Spanner e li scrive nelle tabelle BigQuery usando Dataflow Runner V2.

Tutte le colonne controllate delle modifiche in tempo reale sono incluse in ogni riga della tabella BigQuery, indipendentemente se vengono modificati da una transazione Spanner. Le colonne non guardate non sono incluse nella riga BigQuery. Eventuali modifiche di Spanner precedenti alla marcatura temporale di Dataflow vengono applicate correttamente alle tabelle BigQuery o archiviate nella coda delle email in arrivo non recapitate per il nuovo tentativo. Le righe di BigQuery vengono inserite fuori ordine rispetto all'ordine del timestamp del commit Spanner originale.

Se le tabelle BigQuery necessarie non esistono, la pipeline le crea. Altrimenti, vengono utilizzate le tabelle BigQuery esistenti. Lo schema delle tabelle BigQuery esistenti deve contenere le colonne monitorate corrispondenti delle tabelle Spanner e eventuali colonne aggiuntive per i metadati che non sono ignorate esplicitamente dall'opzione ignoreFields. Consulta la descrizione dei campi dei metadati nell'elenco seguente. Ogni nuova riga BigQuery include tutte le colonne controllato dal flusso di modifiche dalla riga corrispondente nella tabella Spanner al il timestamp del record delle modifiche.

I seguenti campi di metadati vengono aggiunti alle tabelle BigQuery. Per ulteriori dettagli su questi campi, consulta Record delle modifiche dei dati in "Modificare partizioni, record e query di flussi di dati".

  • _metadata_spanner_mod_type: il tipo di modifica (inserimento, aggiornamento o eliminazione) della transazione Spanner. Estratto dal record di modifica dei dati del flusso di modifiche.
  • _metadata_spanner_table_name: il nome della tabella Spanner. Questo non è il nome della tabella dei metadati del connettore.
  • _metadata_spanner_commit_timestamp: Spanner commit timestamp, ovvero l'ora in cui viene eseguito il commit di una modifica. Questo valore viene estratto dal record delle modifiche ai dati delle modifiche in tempo reale.
  • _metadata_spanner_server_transaction_id: una stringa univoca a livello globale che rappresenta la transazione Spanner in cui è stata eseguita la commit della modifica. Utilizza questo valore solo nel contesto dell'elaborazione dei record dello stream di modifiche. Non è correlato all'ID transazione nell'API di Spanner. Questo valore viene estratto dal record delle modifiche dei dati delle modifiche in tempo reale.
  • _metadata_spanner_record_sequence: il numero di sequenza per il record all'interno di la transazione Spanner. I numeri di sequenza sono garantiti come univoci e in aumento monotonico, ma non necessariamente contigui, all'interno di una transazione. Questo valore viene estratto dal record delle modifiche ai dati delle modifiche in tempo reale.
  • _metadata_spanner_is_last_record_in_transaction_in_partition: indica se il record è l'ultimo record di una transazione Spanner nella partizione corrente. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche.
  • _metadata_spanner_number_of_records_in_transaction: il numero di record di variazione dei dati che fanno parte della transazione Spanner in tutte le partizioni del flusso di modifiche. Questo valore viene estratto dal record delle modifiche ai dati delle modifiche in tempo reale.
  • _metadata_spanner_number_of_partitions_in_transaction: il numero di partizioni che restituiscono record di variazione dei dati per la transazione Spanner. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche.
  • _metadata_big_query_commit_timestamp: il timestamp del commit quando la riga viene inserita in BigQuery. Se useStorageWriteApi è true, questa colonna non viene creata automaticamente nella tabella del log delle modifiche dalla pipeline. In questo caso, devi aggiungere manualmente questa colonna nella tabella del log delle modifiche, se necessario.

Quando utilizzi questo modello, tieni presente i seguenti dettagli:

  • Puoi utilizzare questo modello per propagare nuove colonne su tabelle esistenti oppure nuove tabelle da Spanner a BigQuery. Per ulteriori informazioni, consulta Gestire l'aggiunta di tabelle o colonne di monitoraggio.
  • Per i tipi di acquisizione dei valori OLD_AND_NEW_VALUES e NEW_VALUES, quando il record della variazione dei dati contiene una modifica UPDATE, il modello deve eseguire una lettura inattiva su Spanner il timestamp di commit del record delle modifiche ai dati per recuperare le colonne non modificate ma controllate. Assicurati di configurare correttamente "version_retention_period" del database per la lettura non aggiornata. Per il tipo di acquisizione del valore NEW_ROW, il modello è più efficiente perché il record di variazione dei dati acquisisce la nuova riga completa, incluse le colonne che non vengono aggiornate nelle richieste UPDATE, e il modello non deve eseguire una lettura obsoleta.
  • Per ridurre al minimo la latenza di rete e i costi di trasporto della rete, esegui Dataflow nella stessa regione dell'istanza Spanner o delle tabelle BigQuery. Se utilizzi origini, destinazioni, posizioni dei file di staging o posizioni dei file temporanei situate al di fuori della regione del tuo job, i dati potrebbero essere inviati tra regioni. Per ulteriori informazioni, vedi Regioni di Dataflow.
  • Questo modello supporta tutti i tipi di dati Spanner validi. Se il tipo BigQuery è più preciso del tipo Spanner, durante la trasformazione potrebbe verificarsi una perdita di precisione. In particolare:
    • Per il tipo JSON di Spanner, l'ordine dei membri di un oggetto è alfabetico, ma non esiste questa garanzia per il tipo JSON di BigQuery.
    • Spanner supporta il tipo TIMESTAMP in nanosecondi, ma BigQuery supporta solo tipo TIMESTAMP da microsecondi.
  • Questo modello non supporta l'utilizzo di l'API BigQuery StorageWrite in modalità "exactly-once".

Scopri di più sui flussi di modifiche, su come creare pipeline Dataflow in modalità flusso di modifiche e sulle best practice.

Requisiti della pipeline

  • L'istanza Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il database Spanner deve esistere prima dell'esecuzione della pipeline.
  • L'istanza dei metadati Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il database di metadati Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il flusso di modifiche Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il set di dati BigQuery deve esistere prima dell'esecuzione della pipeline.

Handle per l'aggiunta di tabelle o colonne di monitoraggio

Questa sezione descrive le best practice per gestire l'aggiunta di tabelle e colonne Spanner di monitoraggio durante l'esecuzione della pipeline.

  • Prima di aggiungere una nuova colonna a un ambito delle modifiche in tempo reale di Spanner, aggiungi innanzitutto la colonna alla tabella del log delle modifiche di BigQuery. L'elemento aggiunto deve avere un tipo di dati corrispondente e deve essere NULLABLE. Attendi almeno 10 minuti prima di continuare a creare la nuova colonna o in Spanner. La scrittura nella nuova colonna senza attendere potrebbe comportare un record non elaborato con un codice di errore non valido nella directory della coda di lettere morte.
  • Per aggiungere una nuova tabella, aggiungila prima al database Spanner. La tabella viene creata automaticamente in BigQuery quando la pipeline riceve un record per la nuova tabella.
  • Dopo aver aggiunto le nuove colonne o tabelle al database Spanner, assicurati di modificare modifiche in tempo reale per monitorare le nuove colonne o tabelle desiderate non sono già tracciate in modo implicito.
  • Il modello non elimina tabelle o colonne da BigQuery. Se una colonna viene eliminata dalla tabella Spanner, i valori null vengono inseriti nelle colonne del log delle modifiche di BigQuery per i record generati dopo l'eliminazione delle colonne dalla tabella Spanner, a meno che non elimini manualmente la colonna da BigQuery.
  • Il modello non supporta gli aggiornamenti del tipo di colonna. Sebbene Spanner supporta la modifica di una colonna STRING in una BYTES o una colonna BYTES a un STRING, non puoi modificare il tipo di dati di una colonna esistente o utilizzare lo stesso nome di colonna con tipi di dati diversi in BigQuery. Se elimini e ricrei una colonna con lo stesso nome, ma di tipo diverso in Spanner, i dati potrebbero essere scritti nella colonna BigQuery esistente, ma il tipo rimane invariato.
  • Questo modello non supporta gli aggiornamenti della modalità colonna. Colonne dei metadati replicati in BigQuery sono impostati sulla modalità REQUIRED. Tutte le altre colonne replicate in BigQuery sono impostate su NULLABLE, indipendentemente dal fatto che siano definite come NOT NULL nella tabella Spanner. Non puoi aggiornare le colonne NULLABLE alla modalità REQUIRED in BigQuery.
  • La modifica del tipo di acquisizione dei valori di uno stream di variazioni non è supportata per le pipeline in esecuzione.

Parametri del modello

Parametri obbligatori

  • spannerInstanceId: l'istanza Spanner da cui leggere i flussi di modifiche.
  • spannerDatabase : il database Spanner da cui leggere modifiche in tempo reale.
  • spannerMetadataInstanceId : l'istanza Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
  • spannerMetadataDatabase: il database Spanner da utilizzare per la tabella dei metadati del connettore degli stream di modifiche.
  • spannerChangeStreamName: il nome del flusso di modifiche Spanner da cui leggere.
  • bigQueryDataset: il set di dati BigQuery per l'output degli stream di modifiche.

Parametri facoltativi

  • spannerProjectId: il progetto da cui leggere i flussi di modifiche. Questo valore è anche il progetto in cui viene creata la tabella dei metadati del connettore di modifiche in tempo reale. Il valore predefinito per questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
  • spannerDatabaseRole : il ruolo del database Spanner da utilizzare durante l'esecuzione del modello. Questo parametro è obbligatorio solo se l'entità IAM che esegue il modello è un utente del controllo dell'accesso granulare. Il ruolo del database deve disporre del privilegio SELECT sul flusso di modifiche e del privilegio EXECUTE sulla funzione di lettura del flusso di modifiche. Per ulteriori informazioni, consulta Controllo dell'accesso granulare per gli stream di modifiche (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName : la modifiche in tempo reale del nome della tabella dei metadati del connettore di Spanner da utilizzare. Se non viene fornita, una tabella dei metadati del connettore degli stream di variazioni Spanner viene creata automaticamente durante il flusso della pipeline. Devi fornire questo parametro quando aggiorni una pipeline esistente. In caso contrario, non fornire questo parametro.
  • rpcPriority : la priorità delle richieste per le chiamate Spanner. Il valore deve essere uno dei seguenti: HIGH, MEDIUM o LOW. Il valore predefinito è HIGH.
  • spannerHost : l'endpoint di Cloud Spanner da chiamare nel modello. Utilizzato solo per i test. (esempio: https://batch-spanner.googleapis.com).
  • startTimestamp : il valore DateTime iniziale (https://datatracker.ietf.org/doc/html/rfc3339), incluso, da utilizzare per la lettura delle modifiche in tempo reale. Esclusa la data 2021-10-12T07:20:50.52Z. Il valore predefinito è il timestamp all'avvio della pipeline, ovvero l'ora corrente.
  • endTimestamp: la data e l'ora di fine (https://datatracker.ietf.org/doc/html/rfc3339), incluse, da utilizzare per la lettura degli stream di variazioni. Es. 2021-10-12T07:20:50.52Z. Il valore predefinito è un tempo infinito nel futuro.
  • bigQueryProjectId: il progetto BigQuery. Il valore predefinito è il progetto per il job Dataflow.
  • bigQueryChangelogTableNameTemplate: il modello per il nome della tabella BigQuery contenente il log delle modifiche. Il valore predefinito è: {_metadata_spanner_table_name}_changelog.
  • deadLetterQueueDirectory : il percorso per archiviare tutti i record non elaborati. Il percorso predefinito è una directory nella posizione temporanea del job di Dataflow. In genere è sufficiente il valore predefinito.
  • dlqRetryMinutes: il numero di minuti tra le riavviate della coda messaggi non recapitabili. Il valore predefinito è 10.
  • ignoreFields : un elenco di campi separati da virgole (sensibili alle maiuscole) da ignorare. Potrebbero essere campi di tabelle controllate o campi di metadati aggiunti dalla pipeline. I campi ignorati non vengono inseriti in BigQuery. Quando ignori il campo _metadata_spanner_table_name, viene ignorato anche il parametro bigQueryChangelogTableNameTemplate. Il valore predefinito è vuoto.
  • disableDlqRetries: indica se disattivare o meno i tentativi per la coda DLQ. Il valore predefinito è false.
  • useStorageWriteApi : se impostato su true, la pipeline utilizza l'API BigQuery StorageWrite (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è false. Per ulteriori informazioni, consulta Utilizzo dell'API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: quando utilizzi l'API Storage Write, specifica la semantica di scrittura. Per utilizzare la semantica almeno una volta (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), imposta questo parametro su true. Per utilizzare la semantica "exactly-once", imposta il parametro su false. Questo parametro si applica solo quando useStorageWriteApi è true. Il valore predefinito è false.
  • numStorageWriteApiStreams: quando utilizzi l'API Storage Write, specifica il numero di stream di scrittura. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro. Il valore predefinito è 0.
  • storageWriteApiTriggeringFrequencySec: quando utilizzi l'API Storage Write, specifica la frequenza di attivazione in secondi. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Cloud Spanner change streams to BigQuery template.
  6. Inserisci i valori parametro negli appositi campi.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Sostituisci quanto segue:

  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempio us-central1
  • SPANNER_INSTANCE_ID: ID istanza Spanner
  • SPANNER_DATABASE: database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID istanza dei metadati Spanner
  • SPANNER_METADATA_DATABASE: database di metadati Spanner
  • SPANNER_CHANGE_STREAM: flusso di modifiche di Spanner
  • BIGQUERY_DATASET: il set di dati BigQuery per l'output degli stream di modifiche

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sul API e i relativi ambiti di autorizzazione, consulta projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempio us-central1
  • SPANNER_INSTANCE_ID: ID istanza Spanner
  • SPANNER_DATABASE: database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID istanza dei metadati Spanner
  • SPANNER_METADATA_DATABASE: database di metadati Spanner
  • SPANNER_CHANGE_STREAM: flusso di modifiche di Spanner
  • BIGQUERY_DATASET: il set di dati BigQuery per l'output delle modifiche in tempo reale

Passaggi successivi