Modello di flussi di modifiche di Spanner al database di origine

Pipeline di inserimento flussi. Legge i dati dai flussi di modifiche di Spanner e li scrive in un'origine.

Parametri del modello

Parametro Descrizione
changeStreamName Il nome dello stream di modifiche Spanner da cui la pipeline legge.
instanceId Il nome dell'istanza Spanner in cui è presente il flusso di modifiche.
databaseId Il nome del database Spanner monitorato dal flusso di modifiche.
spannerProjectId Il nome del progetto Spanner.
metadataInstance L'istanza in cui memorizzare i metadati utilizzati dal connettore per controllare il consumo dei dati dell'API stream delle modifiche.
metadataDatabase Il database in cui memorizzare i metadati utilizzati dal connettore per controllare il consumo dei dati dell'API flusso di modifiche.
sourceShardsFilePath Percorso di un file Cloud Storage contenente le informazioni del profilo di connessione per i frammenti di origine.
startTimestamp (Facoltativo) Il timestamp iniziale per la lettura delle modifiche. Il valore predefinito è vuoto.
endTimestamp (Facoltativo) Il timestamp di fine per la lettura delle modifiche. Se non viene fornito alcun timestamp, la lettura avviene a tempo indeterminato. Il valore predefinito è vuoto.
shadowTablePrefix (Facoltativo) Il prefisso utilizzato per assegnare un nome alle tabelle shadow. Valore predefinito: shadow_.
sessionFilePath (Facoltativo) Percorso della sessione in Cloud Storage contenente le informazioni di mappatura di HarbourBridge.
filtrationMode (Facoltativo) Modalità di filtrazione. Specifica come eliminare determinati record in base a un criterio. Le modalità supportate sono: none (non filtra nulla), forward_migration (filtra i record scritti utilizzando la pipeline di migrazione in avanti). Il valore predefinito è forward_migration.
shardingCustomJarPath (Facoltativo) Posizione del file JAR personalizzato in Cloud Storage contenente la logica di personalizzazione per il recupero dell'ID shard. Se imposti questo parametro, imposta il parametro shardingCustomJarPath. Il valore predefinito è vuoto.
shardingCustomClassName (Facoltativo) Nome di classe completo con l'implementazione dell'ID shard personalizzato. Se viene specificato shardingCustomJarPath, questo parametro è obbligatorio. Il valore predefinito è vuoto.
shardingCustomParameters (Facoltativo) Stringa contenente eventuali parametri personalizzati da passare alla classe di suddivisione personalizzata. Il valore predefinito è vuoto.
sourceDbTimezoneOffset (Facoltativo) La differenza di fuso orario rispetto a UTC per il database di origine. Valore di esempio: +10:00. Il valore predefinito è +00:00.
dlqGcsPubSubSubscription (Facoltativo) L'abbonamento Pub/Sub utilizzato in un criterio di notifica Cloud Storage per la directory di ripetizione della coda di lavoro filtrata quando viene eseguito in modalità normale. Il nome deve essere nel formato projects/<project-id>/subscriptions/<subscription-name>. Se impostato, deadLetterQueueDirectory e dlqRetryMinutes vengono ignorati.
skipDirectoryName (Facoltativo) I record ignorati dalla replica inversa vengono scritti in questa directory. Il nome predefinito della directory è skip.
maxShardConnections (Facoltativo) Il numero massimo di connessioni che un determinato shard può accettare. Il valore predefinito è 10000.
deadLetterQueueDirectory (Facoltativo) Il percorso utilizzato per archiviare l'output della coda di errori. Il percorso predefinito è una directory nella posizione temporanea del job Dataflow.
dlqMaxRetryCount (Facoltativo) Il numero massimo di volte in cui è possibile ritentare gli errori temporanei tramite la coda dei messaggi non recapitabili. Il valore predefinito è 500.
runMode (Facoltativo) Il tipo di modalità di esecuzione. Valori supportati: regular, retryDLQ. Valore predefinito: regular. Specifica retryDLQ per eseguire nuovamente il tentativo solo per i record della coda delle email inutilizzate gravi.
dlqRetryMinutes (Facoltativo) Il numero di minuti tra i tentativi nella coda di messaggi non recapitabili. Il valore predefinito è 10.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Spanner Change Streams to Source Database template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Interfaccia a riga di comando gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_to_SourceDb \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       changeStreamName=CHANGE_STREAM_NAME,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       spannerProjectId=SPANNER_PROJECT_ID,\
       metadataInstance=METADATA_INSTANCE,\
       metadataDatabase=METADATA_DATABASE,\
       sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\

Sostituisci quanto segue:

  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • CHANGE_STREAM_NAME: il nome del flusso di modifiche da cui leggere
  • INSTANCE_ID: l'ID istanza Cloud Spanner.
  • DATABASE_ID: l'ID del database Cloud Spanner.
  • SPANNER_PROJECT_ID: l'ID progetto Cloud Spanner.
  • METADATA_INSTANCE: l'istanza Cloud Spanner per archiviare i metadati durante la lettura dai changestream
  • METADATA_DATABASE: il database Cloud Spanner per archiviare i metadati durante la lettura da changestream
  • SOURCE_SHARDS_FILE_PATH: il percorso del file GCS contenente i dettagli del frammento di origine

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "changeStreamName": "CHANGE_STREAM_NAME",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "metadataInstance": "METADATA_INSTANCE",
       "metadataDatabase": "METADATA_DATABASE",
       "sourceShardsFilePath": "SOURCE_SHARDS_FILE_PATH",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_to_SourceDb",
     "environment": { "maxWorkers": "10" }
  }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • CHANGE_STREAM_NAME: il nome del flusso di modifiche da cui leggere
  • INSTANCE_ID: l'ID istanza Cloud Spanner.
  • DATABASE_ID: l'ID del database Cloud Spanner.
  • SPANNER_PROJECT_ID: l'ID progetto Cloud Spanner.
  • METADATA_INSTANCE: l'istanza Cloud Spanner per archiviare i metadati durante la lettura dai changestream
  • METADATA_DATABASE: il database Cloud Spanner per archiviare i metadati durante la lettura da changestream
  • SOURCE_SHARDS_FILE_PATH: il percorso del file GCS contenente i dettagli del frammento di origine