Le modifiche in tempo reale al modello Cloud Storage di Spanner sono pipeline di flusso che trasmette i flussi di record delle modifiche dei dati di Spanner e li scrive in un bucket Cloud Storage utilizzando Dataflow Runner v2.
La pipeline raggruppa i record dello stream di variazioni di Spanner in finestre in base al timestamp, con ogni finestra che rappresenta una durata temporale la cui lunghezza puoi configurare con questo modello. Tutti i record con timestamp appartenenti alla finestra sono garantiti per essere all'interno della finestra; non possono esserci arrivi in ritardo. Puoi inoltre definire un numero di shard di output; la pipeline crea un file di output di Cloud Storage per finestra per shard. All'interno di un file di output, i record non sono ordinati. I file di output possono essere scritti in formato JSON o AVRO, a seconda della configurazione dell'utente.
Tieni presente che puoi ridurre al minimo la latenza di rete e i costi di trasporto della rete eseguendo il comando Job Dataflow dalla stessa regione della tua istanza Spanner nel bucket Cloud Storage. Se utilizzi origini, sink, posizioni dei file temporanei o di posizioni dei file che si trovano al di fuori della regione del job, i tuoi dati potrebbero essere inviati regioni. Scopri di più su Regioni di Dataflow.
Scopri di più sui flussi di modifiche, su come creare pipeline Dataflow con flussi di modifiche e sulle best practice.
Requisiti della pipeline
- L'istanza Spanner deve esistere prima dell'esecuzione della pipeline.
- Il database Spanner deve esistere prima dell'esecuzione della pipeline.
- L'istanza di metadati Spanner deve esistere prima dell'esecuzione della pipeline.
- Il database di metadati Spanner deve esistere prima dell'esecuzione della pipeline.
- Il flusso di modifiche Spanner deve esistere prima dell'esecuzione della pipeline.
- Il bucket di output Cloud Storage deve esistere prima dell'esecuzione della pipeline.
Parametri del modello
Parametri obbligatori
- spannerInstanceId: l'ID istanza Spanner da cui leggere i dati dei flussi di modifica.
- spannerDatabase : il database Spanner da cui leggere i flussi di dati delle modifiche.
- spannerMetadataInstanceId : l'ID istanza Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
- spannerMetadataDatabase : il database Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
- spannerChangeStreamName : il nome del flusso di modifiche Spanner da cui leggere.
- gcsOutputDirectory: il percorso e il prefisso del nome file per la scrittura dei file di output. Deve terminare con una barra. La formattazione DateTime viene utilizzata per analizzare il percorso della directory per data & formattari dell'ora. ad esempio gs://your-bucket/your-path.
Parametri facoltativi
- spannerProjectId: l'ID del progetto Google Cloud contenente il database Spanner da cui leggere gli stream di modifiche. In questo progetto viene anche creata la tabella dei metadati del connettore di modifiche in tempo reale. Il valore predefinito per questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
- spannerDatabaseRole: il ruolo del database Spanner da utilizzare durante l'esecuzione del modello. Questo parametro è obbligatorio solo se l'entità IAM che esegue il modello è un utente con controllo dell'accesso granulare. Il ruolo del database deve avere il privilegio
SELECT
per il flusso di modifiche e il privilegioEXECUTE
per la funzione di lettura della modifica in tempo reale. Per ulteriori informazioni, consulta Controllo dell'accesso granulare per gli stream di modifiche (https://cloud.google.com/spanner/docs/fgac-change-streams). - spannerMetadataTableName : la modifiche in tempo reale del nome della tabella dei metadati del connettore di Spanner da utilizzare. Se non viene fornita, una tabella dei metadati degli stream di variazioni Spanner viene creata automaticamente durante l'esecuzione della pipeline. Devi fornire un valore per questo parametro quando aggiorni una pipeline esistente. In caso contrario, non utilizzare questo parametro.
- startTimestamp : il valore DateTime di inizio, incluso, da utilizzare per la lettura dei modifiche in tempo reale, nel formato Ex-2021-10-12T07:20:50.52Z. Il valore predefinito è il timestamp all'avvio della pipeline, ovvero l'ora corrente.
- endTimestamp: la data e l'ora di fine, incluse, da utilizzare per la lettura degli stream di variazioni. Ad esempio, Ex-2021-10-12T07:20:50.52Z. Il valore predefinito è un'ora infinita nel futuro.
- spannerHost : l'endpoint di Cloud Spanner da chiamare nel modello. Utilizzato solo per i test. Esempio: https://spanner.googleapis.com. Il valore predefinito è: https://spanner.googleapis.com.
- outputFileFormat : il formato del file Cloud Storage di output. I formati consentiti sono TEXT e AVRO. Il valore predefinito è AVRO.
- windowDuration : la durata della finestra è l'intervallo in cui i dati vengono scritti nella directory di output. Configura la durata in base al throughput della pipeline. Ad esempio, una velocità effettiva più elevata potrebbe richiedere finestre di dimensioni inferiori affinché i dati rientrino in memoria. Il valore predefinito è 5 minuti, con un minimo di 1 secondo. I formati consentiti sono: [int]s (per secondi, ad esempio 5s), [int]m (per minuti, ad esempio 12m), [int]h (per ore, ad esempio 2h). (Esempio: 5 m).
- rpcPriority : la priorità delle richieste per le chiamate Spanner. Il valore deve essere ALTO, MEDIO o BASSO. Il valore predefinito è HIGH.
- outputFilenamePrefix : il prefisso da inserire in ogni file nella finestra. (Esempio: output-). Valore predefinito: output.
- numShards : il numero massimo di shard di output prodotti durante la scrittura. Un numero maggiore di shard comporta una maggiore velocità effettiva per la scrittura in Cloud Storage, ma un costo potenzialmente più elevato per l'aggregazione dei dati tra gli shard durante l'elaborazione dei file di output di Cloud Storage. Il valore predefinito è 20.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Nel menu a discesa Modello di flusso di dati, seleziona the Cloud Spanner change streams to Google Cloud Storage template.
- Inserisci i valori parametro negli appositi campi.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ gcsOutputDirectory=GCS_OUTPUT_DIRECTORY
Sostituisci quanto segue:
JOB_NAME
: un nome di job univoco a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempious-central1
SPANNER_INSTANCE_ID
: ID istanza Cloud SpannerSPANNER_DATABASE
: database Cloud SpannerSPANNER_METADATA_INSTANCE_ID
: ID istanza dei metadati Cloud SpannerSPANNER_METADATA_DATABASE
: database dei metadati di Cloud SpannerSPANNER_CHANGE_STREAM
: modifiche in tempo reale di Cloud SpannerGCS_OUTPUT_DIRECTORY
: posizione del file per l'output delle modifiche in tempo reale
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sul
API e i relativi ambiti di autorizzazione, consulta
projects.templates.launch
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage", } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome di job univoco a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile in cartella principale non-dated nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempious-central1
SPANNER_INSTANCE_ID
: ID istanza Cloud SpannerSPANNER_DATABASE
: database Cloud SpannerSPANNER_METADATA_INSTANCE_ID
: ID istanza dei metadati Cloud SpannerSPANNER_METADATA_DATABASE
: database dei metadati di Cloud SpannerSPANNER_CHANGE_STREAM
: modifiche in tempo reale di Cloud SpannerGCS_OUTPUT_DIRECTORY
: posizione del file per l'output delle modifiche in tempo reale
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.