Modifiche in tempo reale di Spanner al modello di BigQuery

Il modello di modifiche in tempo reale reale di Spanner per BigQuery è una pipeline in modalità flusso che trasmette i flussi di record delle modifiche dei dati di Spanner e li scrive nelle tabelle BigQuery usando Dataflow Runner V2.

Tutte le colonne controllate delle modifiche in tempo reale sono incluse in ogni riga della tabella BigQuery, indipendentemente se vengono modificati da una transazione Spanner. Le colonne non guardate non sono incluse nella riga BigQuery. Eventuali modifiche a Spanner inferiori a Dataflow la filigrana viene applicata correttamente alle tabelle BigQuery o viene archiviata coda messaggi non recapitabili per nuovo tentativo. Le righe BigQuery vengono inserite nell'ordine sbagliato rispetto a l'ordinamento originale del timestamp di commit di Spanner.

Se le tabelle BigQuery necessarie non esistono, la pipeline le crea. Altrimenti, vengono utilizzate le tabelle BigQuery esistenti. Lo schema delle tabelle BigQuery esistenti deve contengono le colonne monitorate corrispondenti delle tabelle Spanner e qualsiasi altro colonne di metadati che non vengono ignorate esplicitamente dall'opzione ignoreFields. Consulta la descrizione dei campi dei metadati nell'elenco seguente. Ogni nuova riga BigQuery include tutte le colonne controllato dal flusso di modifiche dalla riga corrispondente nella tabella Spanner al il timestamp del record delle modifiche.

I seguenti campi di metadati vengono aggiunti alle tabelle BigQuery. Per ulteriori dettagli su questi campi, consulta Record delle modifiche dei dati in "Modificare partizioni, record e query di flussi di dati".

Quando utilizzi questo modello, tieni presente i seguenti dettagli:

  • Questo modello non propaga le modifiche allo schema da Spanner a BigQuery. Poiché è probabile che l'esecuzione di una modifica dello schema in Spanner danneggi la pipeline, potresti dover ricreare la pipeline dopo la modifica dello schema.
  • Per i tipi di acquisizione dei valori OLD_AND_NEW_VALUES e NEW_VALUES, quando il record della variazione dei dati contiene una modifica UPDATE, il modello deve eseguire una lettura inattiva su Spanner il timestamp di commit del record delle modifiche ai dati per recuperare le colonne non modificate ma controllate. Marca assicurati di configurare il database "version_retention_period" in modo corretto per la lettura inattiva. Per il tipo di acquisizione valore NEW_ROW, il modello è più efficiente, perché il record delle modifiche dei dati acquisisce la nuova riga completa, incluse le colonne che non sono aggiornate nelle richieste UPDATE, e il modello non non è necessario leggere.
  • Per ridurre al minimo la latenza di rete e i costi di trasporto della rete, esegui Dataflow nella stessa regione dell'istanza Spanner o delle tabelle BigQuery. Se utilizzare origini, sink, posizioni dei file temporanei o posizioni dei file temporanei che si trovano all'esterno della regione del job, i tuoi dati potrebbero essere inviati tra regioni diverse. Per ulteriori informazioni, vedi Regioni di Dataflow.
  • Questo modello supporta tutti i tipi di dati Spanner validi. Se BigQuery è più preciso rispetto al tipo Spanner, la perdita di precisione potrebbe verificarsi durante e la trasformazione dei dati. Nello specifico:
    • Per il tipo JSON Spanner, l'ordine dei membri di un oggetto è in ordine grammaticale, ma non esiste una garanzia di questo tipo per il tipo JSON BigQuery.
    • Spanner supporta il tipo TIMESTAMP in nanosecondi, ma BigQuery supporta solo tipo TIMESTAMP da microsecondi.
  • Questo modello non supporta l'utilizzo di l'API BigQuery StorageWrite in modalità "exactly-once".

Scopri di più sui flussi di modifiche, su come creare pipeline Dataflow in modalità flusso di modifiche e sulle best practice.

Requisiti della pipeline

  • L'istanza Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il database Spanner deve esistere prima dell'esecuzione della pipeline.
  • L'istanza dei metadati Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il database di metadati Spanner deve esistere prima dell'esecuzione della pipeline.
  • Le modifiche in tempo reale di Spanner devono esistere prima dell'esecuzione della pipeline.
  • Il set di dati BigQuery deve esistere prima dell'esecuzione della pipeline.

Parametri del modello

Parametri obbligatori

  • spannerInstanceId : l'istanza Spanner da cui leggere le modifiche in tempo reale.
  • spannerDatabase : il database Spanner da cui leggere modifiche in tempo reale.
  • spannerMetadataInstanceId : l'istanza Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
  • spannerMetadataDatabase : il database Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
  • spannerChangeStreamName : il nome del flusso di modifiche Spanner da cui leggere.
  • bigQueryDataset : il set di dati BigQuery per le modifiche in tempo reale di output.

Parametri facoltativi

  • spannerProjectId : il progetto da cui leggere modifiche in tempo reale. Questo valore è anche il progetto in cui viene creata la tabella dei metadati del connettore di modifiche in tempo reale. Il valore predefinito di questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
  • spannerDatabaseRole : il ruolo del database Spanner da utilizzare durante l'esecuzione del modello. Questo parametro è obbligatorio solo se l'entità IAM che esegue il modello è un utente con controllo dell'accesso granulare. Il ruolo del database deve avere il privilegio SELECT nel flusso di modifiche e il privilegio EXECUTE per la funzione di lettura del flusso di modifiche. Per ulteriori informazioni, consulta Controllo dell'accesso granulare per le modifiche in tempo reale (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName : la modifiche in tempo reale del nome della tabella dei metadati del connettore di Spanner da utilizzare. Se non viene specificata, durante il flusso della pipeline viene creata automaticamente una tabella dei metadati del connettore di modifiche in tempo reale in modalità Spanner. Devi fornire questo parametro quando aggiorni una pipeline esistente. In caso contrario, non fornire questo parametro.
  • rpcPriority : la priorità delle richieste per le chiamate Spanner. Il valore deve essere uno dei seguenti: HIGH, MEDIUM o LOW. Il valore predefinito è HIGH.
  • spannerHost : l'endpoint di Cloud Spanner da chiamare nel modello. Utilizzato solo per i test. Esempio: https://batch-spanner.googleapis.com.
  • startTimestamp : il valore DateTime iniziale (https://datatracker.ietf.org/doc/html/rfc3339), incluso, da utilizzare per la lettura delle modifiche in tempo reale. Ex-2021-10-12T07:20:50.52Z. Il valore predefinito è il timestamp di avvio della pipeline, ovvero l'ora attuale.
  • endTimestamp : il DateTime finale (https://datatracker.ietf.org/doc/html/rfc3339), incluso, da utilizzare per la lettura delle modifiche in tempo reale.Esempio: 2021-10-12T07:20:50.52Z. Il valore predefinito è un tempo infinito nel futuro.
  • bigQueryProjectId : il progetto BigQuery. Il valore predefinito è il progetto per il job Dataflow.
  • bigQueryChangelogTableNameTemplate : il modello per il nome della tabella BigQuery contenente il log delle modifiche. Il valore predefinito è: {_metadata_spanner_table_name}_changelog.
  • deadLetterQueueDirectory : il percorso per archiviare tutti i record non elaborati. Il percorso predefinito è una directory all'interno della località temporanea del job Dataflow. In genere è sufficiente il valore predefinito.
  • dlqRetryMinutes : il numero di minuti tra i nuovi tentativi in una coda di messaggi non recapitabili. Il valore predefinito è 10.
  • ignoreFields : un elenco di campi separati da virgole (sensibili alle maiuscole) da ignorare. Potrebbero essere campi di tabelle controllate o campi di metadati aggiunti dalla pipeline. I campi ignorati non vengono inseriti in BigQuery. Quando ignori il campo _metadata_spanner_table_name, viene ignorato anche il parametro bigQueryChangelogTableNameTemplate. Il campo predefinito è vuoto.
  • disableDlqRetries : indica se disabilitare o meno i nuovi tentativi per la DLQ. Il valore predefinito è false.
  • useStorageWriteApi : se true, la pipeline utilizza l'API BigQuery Storage Scrivi (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è false. Per ulteriori informazioni, consulta Utilizzo dell'API StorageWrite (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce : quando si utilizza l'API StorageWrite, specifica la semantica della scrittura. Per utilizzare la semantica "at-least-once-semantics" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), imposta questo parametro su true. Per utilizzare la semantica "exactly-once", imposta il parametro su false. Questo parametro si applica solo quando useStorageWriteApi è true. Il valore predefinito è false.
  • numStorageWriteApiStreams : quando si utilizza l'API StorageWrite, specifica il numero di flussi di scrittura. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro. Il valore predefinito è 0.
  • storageWriteApiTriggeringFrequencySec : quando utilizzi l'API StorageWrite, specifica la frequenza di attivazione in secondi. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. Il valore predefinito è us-central1.

    Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Cloud Spanner change streams to BigQuery template.
  6. Inserisci i valori parametro negli appositi campi.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Sostituisci quanto segue:

  • JOB_NAME: un nome job univoco di tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempio us-central1
  • SPANNER_INSTANCE_ID: ID istanza Spanner
  • SPANNER_DATABASE: database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID istanza di metadati di Spanner
  • SPANNER_METADATA_DATABASE: database di metadati Spanner
  • SPANNER_CHANGE_STREAM: modifiche in tempo reale di Spanner
  • BIGQUERY_DATASET: il set di dati BigQuery per l'output delle modifiche in tempo reale

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sul API e i relativi ambiti di autorizzazione, consulta projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: L'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco di tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempio us-central1
  • SPANNER_INSTANCE_ID: ID istanza Spanner
  • SPANNER_DATABASE: database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID istanza di metadati di Spanner
  • SPANNER_METADATA_DATABASE: database di metadati Spanner
  • SPANNER_CHANGE_STREAM: modifiche in tempo reale di Spanner
  • BIGQUERY_DATASET: il set di dati BigQuery per l'output delle modifiche in tempo reale

Passaggi successivi