Modifiche in tempo reale di Spanner al modello di Cloud Storage

Le modifiche in tempo reale di Spanner al modello di Cloud Storage sono pipeline di flusso che trasmettono i flussi di record delle modifiche dei dati di Spanner e li scrivono in un bucket Cloud Storage utilizzando Dataflow Runner v2.

La pipeline raggruppa i record di modifiche in tempo reale di Spanner in finestre in base al relativo timestamp. Ogni finestra rappresenta una durata di tempo di cui puoi configurare la durata con questo modello. Tutti i record con timestamp appartenenti alla finestra sono garantiti per essere nella finestra; non ci possono essere arrivi in ritardo. Puoi anche definire un numero di shard di output; la pipeline crea un file di output di Cloud Storage per finestra per ogni shard. All'interno di un file di output, i record non sono ordinati. I file di output possono essere scritti in formato JSON o AVRO, a seconda della configurazione utente.

Tieni presente che puoi ridurre al minimo la latenza di rete e i costi di trasporto di rete eseguendo il job Dataflow dalla stessa regione della tua istanza Spanner o del bucket Cloud Storage. Se utilizzi origini, sink, posizioni dei file temporanei o posizioni dei file temporanei che si trovano al di fuori della regione del job, i tuoi dati potrebbero essere inviati tra regioni. Scopri di più sulle regioni Dataflow.

Scopri di più sui flussi di modifiche, su come creare pipeline Dataflow in modalità flusso di modifiche e sulle best practice.

Requisiti della pipeline

  • L'istanza Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il database Spanner deve esistere prima dell'esecuzione della pipeline.
  • L'istanza dei metadati Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il database di metadati Spanner deve esistere prima dell'esecuzione della pipeline.
  • Le modifiche in tempo reale di Spanner devono esistere prima dell'esecuzione della pipeline.
  • Il bucket di output di Cloud Storage deve esistere prima dell'esecuzione della pipeline.

Parametri del modello

Parametri obbligatori

  • spannerInstanceId : l'ID dell'istanza Spanner da cui leggere i flussi di dati delle modifiche.
  • spannerDatabase : il database Spanner da cui leggere i flussi di dati delle modifiche.
  • spannerMetadataInstanceId : l'ID istanza Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
  • spannerMetadataDatabase : il database Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
  • spannerChangeStreamName : il nome del flusso di modifiche Spanner da cui leggere.
  • gcsOutputDirectory : il prefisso del percorso e del nome file per la scrittura dei file di output. Deve terminare con una barra. La formattazione DateTime viene utilizzata per analizzare il percorso della directory per i formattatori di data e ora. ad esempio gs://your-bucket/your-path.

Parametri facoltativi

  • spannerProjectId : l'ID del progetto Google Cloud che contiene il database Spanner da cui leggere le modifiche in tempo reale. In questo progetto viene anche creata la tabella dei metadati del connettore di modifiche in tempo reale. Il valore predefinito per questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
  • spannerDatabaseRole : il ruolo del database Spanner da utilizzare durante l'esecuzione del modello. Questo parametro è obbligatorio solo se l'entità IAM che esegue il modello è un utente con controllo dell'accesso dell'accesso granulare. Il ruolo del database deve avere il privilegio SELECT per la modifica in tempo reale e il privilegio EXECUTE per la funzione di lettura del flusso di modifiche. Per ulteriori informazioni, consulta Controllo dell'accesso granulare per le modifiche in tempo reale (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName : la modifiche in tempo reale del nome della tabella dei metadati del connettore di Spanner da utilizzare. Se non viene specificata, durante l'esecuzione della pipeline viene creata automaticamente una tabella di metadati delle modifiche in tempo reale di Spanner. Devi fornire un valore per questo parametro quando aggiorni una pipeline esistente. In caso contrario, non utilizzare questo parametro.
  • startTimestamp : il valore DateTime di inizio, incluso, da utilizzare per la lettura delle modifiche in tempo reale, nel formato Ex-2021-10-12T07:20:50.52Z. Il valore predefinito è il timestamp di avvio della pipeline, ovvero l'ora attuale.
  • endTimestamp : la data/ora finale (inclusa) da utilizzare per la lettura delle modifiche in tempo reale. Ad esempio, Esempio-2021-10-12T07:20:50.52Z. Il valore predefinito è un tempo infinito nel futuro.
  • spannerHost : l'endpoint di Cloud Spanner da chiamare nel modello. Utilizzato solo per i test. Esempio: https://spanner.googleapis.com. Il valore predefinito è: https://spanner.googleapis.com.
  • outputFileFormat : il formato del file Cloud Storage di output. I formati consentiti sono TEXT e AVRO. Il valore predefinito è AVRO.
  • windowDuration : la durata della finestra è l'intervallo in cui i dati vengono scritti nella directory di output. Configura la durata in base alla velocità effettiva della pipeline. Ad esempio, una velocità effettiva più elevata potrebbe richiedere finestre di dimensioni inferiori affinché i dati rientrino in memoria. Il valore predefinito è 5 m (cinque minuti), con un minimo di 1 s (un secondo). I formati consentiti sono: [int]s (per i secondi, ad esempio 5s), [int]m (per i minuti, ad esempio 12m), [int]h (per le ore, ad esempio 2h). (Esempio: 5 m).
  • rpcPriority : la priorità delle richieste per le chiamate Spanner. Il valore deve essere HIGH, MEDIUM o LOW. Il valore predefinito è HIGH.
  • outputFilenamePrefix : il prefisso da inserire in ogni file finestrato. (Esempio: output-). Il valore predefinito è: output.
  • numShards : il numero massimo di shard di output prodotti durante la scrittura. Un numero più elevato di shard significa una maggiore velocità effettiva per la scrittura in Cloud Storage, ma un costo di aggregazione dei dati potenzialmente superiore negli shard durante l'elaborazione dei file Cloud Storage di output. Il valore predefinito è 20.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Cloud Spanner change streams to Google Cloud Storage template.
  6. Inserisci i valori parametro negli appositi campi.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

Sostituisci quanto segue:

  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/
    • il nome della versione, ad esempio 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • SPANNER_INSTANCE_ID: ID istanza Cloud Spanner
  • SPANNER_DATABASE: database Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID istanza metadati Cloud Spanner
  • SPANNER_METADATA_DATABASE: database di metadati Cloud Spanner
  • SPANNER_CHANGE_STREAM: modifiche in tempo reale di Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: posizione del file per l'output delle modifiche in tempo reale

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/
    • il nome della versione, ad esempio 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • SPANNER_INSTANCE_ID: ID istanza Cloud Spanner
  • SPANNER_DATABASE: database Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID istanza metadati Cloud Spanner
  • SPANNER_METADATA_DATABASE: database di metadati Cloud Spanner
  • SPANNER_CHANGE_STREAM: modifiche in tempo reale di Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: posizione del file per l'output delle modifiche in tempo reale

Passaggi successivi