Il modello di modifiche in tempo reale di Spanner a Pub/Sub è una pipeline in modalità flusso che esegue il flusso dei record delle modifiche dei dati di Spanner e li scrive negli argomenti Pub/Sub utilizzando Dataflow Runner 2.
Per esportare i dati in un nuovo argomento Pub/Sub, devi prima creare l'argomento. Dopo la creazione, Pub/Sub genera e allega automaticamente una sottoscrizione al nuovo argomento. Se provi a esportare i dati in un argomento Pub/Sub non esistente, la pipeline di Dataflow genera un'eccezione e si blocca mentre tenta continuamente di stabilire una connessione.
Se l'argomento Pub/Sub necessario esiste già, puoi esportare i dati in quell'argomento.
Per saperne di più, consulta Informazioni sui flussi di modifiche, Creare connessioni con modifiche in tempo reale con Dataflow, e Best practice per i flussi di modifiche.
Requisiti della pipeline
- L'istanza Spanner deve esistere prima dell'esecuzione della pipeline.
- Il database Spanner deve esistere prima dell'esecuzione della pipeline.
- L'istanza di metadati Spanner deve esistere prima dell'esecuzione della pipeline.
- Il database dei metadati Spanner deve esistere prima dell'esecuzione della pipeline.
- Il flusso di modifiche Spanner deve esistere prima dell'esecuzione della pipeline.
- L'argomento Pub/Sub deve esistere prima dell'esecuzione della pipeline.
Parametri del modello
Parametri obbligatori
- spannerInstanceId : l'istanza Spanner da cui leggere modifiche in tempo reale.
- spannerDatabase : il database Spanner da cui leggere modifiche in tempo reale.
- spannerMetadataInstanceId : l'istanza Spanner da utilizzare per la tabella dei metadati del connettore delle modifiche in tempo reale.
- spannerMetadataDatabase : il database Spanner da utilizzare per la tabella dei metadati del connettore degli modifiche in tempo reale.
- spannerChangeStreamName : il nome del flusso di modifiche Spanner da cui leggere.
- pubsubTopic : l'argomento Pub/Sub per l'output modifiche in tempo reale.
Parametri facoltativi
- spannerProjectId : il progetto da cui leggere modifiche in tempo reale. In questo progetto viene creata anche la tabella dei metadati del connettore degli modifiche in tempo reale. Il valore predefinito per questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
- spannerDatabaseRole : il ruolo del database Spanner da utilizzare durante l'esecuzione del modello. Questo parametro è obbligatorio solo se l'entità IAM che esegue il modello è un utente controllo dell'accesso dell'accesso granulare. Il ruolo del database deve disporre del privilegio
SELECT
sul flusso di modifiche e del privilegioEXECUTE
sulla funzione di lettura del flusso di modifiche. Per ulteriori informazioni, consulta Controllo dell'accesso granulare per modifiche in tempo reale (https://cloud.google.com/spanner/docs/fgac-change-streams). - spannerMetadataTableName : il nome della tabella dei metadati del connettore di modifiche in tempo reale di Spanner da utilizzare. Se non viene fornita, Spanner crea automaticamente la tabella dei metadati del connettore di stream durante la modifica del flusso della pipeline. Devi fornire questo parametro quando aggiorni una pipeline esistente. Non utilizzare questo parametro per altri casi.
- startTimestamp : la data e l'ora di inizio (https://tools.ietf.org/html/rfc3339), incluse, da utilizzare per la lettura degli modifiche in tempo reale. Ad esempio, ex- 2021-10-12T07:20:50.52Z. Il valore predefinito è il timestamp all'avvio della pipeline, ovvero l'ora corrente.
- endTimestamp : la data e l'ora di fine (https://tools.ietf.org/html/rfc3339), incluse, da utilizzare per la lettura degli modifiche in tempo reale. Ad esempio, ex- 2021-10-12T07:20:50.52Z. Il valore predefinito è un'ora infinita nel futuro.
- spannerHost : l'endpoint Cloud Spanner da chiamare nel modello. Utilizzato solo per i test. (ad esempio https://spanner.googleapis.com). Valore predefinito: https://spanner.googleapis.com.
- outputDataFormat : il formato dell'output. L'output viene racchiuso in molti messaggi Pubsub e inviato a un argomento Pub/Sub. I formati consentiti sono JSON e AVRO. Il valore predefinito è JSON.
- pubsubAPI : l'API Pub/Sub utilizzata per implementare la pipeline. Le API consentite sono
pubsubio
enative_client
. Per un numero ridotto di query al secondo (QPS),native_client
ha una latenza inferiore. Per un numero elevato di QPS,pubsubio
offre prestazioni migliori e più stabili. Il valore predefinito èpubsubio
. - pubsubProjectId : progetto dell'argomento Pub/Sub. Il valore predefinito per questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
- rpcPriority : la priorità della richiesta per le chiamate Spanner. I valori consentiti sono HIGH, MEDIUM e LOW. Il valore predefinito è HIGH.
- includeSpannerSource : indica se includere o meno l'ID database e l'ID istanza Spanner da cui leggere il flusso di modifiche nei dati del messaggio di output. Il valore predefinito è false.
- outputMessageMetadata : il valore di stringa per il campo personalizzato outputMessageMetadata nel messaggio Pub/Sub di output. Il valore predefinito è vuoto e il campo outputMessageMetadata viene compilato solo se questo valore non è vuoto. Inserisci i caratteri speciali con il carattere di escape quando inserisci il valore qui(ad es. virgolette doppie).
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Nel menu a discesa Modello di flusso di dati, seleziona the Cloud Spanner change streams to Pub/Sub template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ pubsubTopic=PUBSUB_TOPIC
Sostituisci quanto segue:
JOB_NAME
: un nome di job univoco a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
SPANNER_INSTANCE_ID
: ID istanza SpannerSPANNER_DATABASE
: database SpannerSPANNER_METADATA_INSTANCE_ID
: ID istanza dei metadati SpannerSPANNER_METADATA_DATABASE
: database dei metadati di SpannerSPANNER_CHANGE_STREAM
: modifica in tempo reale di SpannerPUBSUB_TOPIC
: l'argomento Pub/Sub per l'output dei modifiche in tempo reale
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "pubsubTopic": "PUBSUB_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub", } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome di job univoco a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
SPANNER_INSTANCE_ID
: ID istanza SpannerSPANNER_DATABASE
: database SpannerSPANNER_METADATA_INSTANCE_ID
: ID istanza dei metadati SpannerSPANNER_METADATA_DATABASE
: database dei metadati di SpannerSPANNER_CHANGE_STREAM
: modifica in tempo reale di SpannerPUBSUB_TOPIC
: l'argomento Pub/Sub per l'output dei modifiche in tempo reale
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.