Modello da Datastream a Spanner

Il modello da Datastream a Spanner è una pipeline di flusso che legge Eventi Datastream da un bucket Cloud Storage e le scrive in un database Spanner. È destinata a migrazione dei dati da origini Datastream a Spanner.

Tutte le tabelle richieste per la migrazione devono esistere nel database Spanner di destinazione prima di dell'esecuzione del modello. Di conseguenza la migrazione dello schema da un database di origine a Spanner di destinazione deve essere completato prima della migrazione dei dati. I dati possono essere presenti nelle tabelle prima della migrazione. Questo il modello non propaga le modifiche allo schema Datastream in Spanner per configurare un database.

La coerenza dei dati è garantita solo al termine della migrazione, quando tutti i dati sono stati scritti su Spanner. Per archiviare le informazioni di ordinamento per ogni record scritto in Spanner, crea una tabella aggiuntiva (denominata tabella ombra) per ogni tabella nel il database Spanner. Viene utilizzato per garantire coerenza alla fine della migrazione. L'ombra le tabelle non vengono eliminate dopo la migrazione e possono essere utilizzate per la convalida al termine migrazione.

Eventuali errori che si verificano durante l'operazione, ad esempio mancata corrispondenza dello schema, file JSON non corretti o errori derivanti dall'esecuzione di trasformazioni, vengono registrati in una coda di errori. La coda degli errori è Cartella Cloud Storage in cui sono archiviati tutti gli eventi Datastream che si sono verificati e il motivo dell'errore in formato testo. Gli errori possono essere temporanei o permanenti. siano archiviate nelle rispettive cartelle di Cloud Storage, nella coda degli errori. Gli errori temporanei sono nuovo tentativo automaticamente, mentre gli errori permanenti non lo sono. In caso di errori permanenti, devi l'opzione di apportare correzioni agli eventi di modifica e di spostarli nel bucket recuperabile durante l'esecuzione del modello.

Requisiti della pipeline

  • Uno stream Datastream in stato In esecuzione o Non avviato.
  • Un bucket Cloud Storage in cui vengono replicati gli eventi Datastream.
  • Un database Spanner con tabelle esistenti. Queste tabelle possono essere vuote o contenere dati.

Parametri del modello

Parametri obbligatori

  • inputFilePattern : il percorso del file Cloud Storage che contiene i file Datastream da replicare. Solitamente, si tratta del percorso principale per uno stream.
  • instanceId : l'istanza Spanner in cui vengono replicate le modifiche.
  • databaseId : il database Spanner in cui vengono replicate le modifiche.
  • streamName : il nome o il modello dello stream da sottoporre a polling per ottenere le informazioni sullo schema e il tipo di origine.

Parametri facoltativi

  • inputFileFormat : il formato del file di output generato da Datastream. Ad esempio avro,json. Valore predefinito: avro.
  • sessionFilePath : percorso del file di sessione in Cloud Storage che contiene informazioni di mappatura da HarbourBridge.
  • projectId : l'ID progetto Spanner.
  • spannerHost : l'endpoint di Cloud Spanner da chiamare nel modello. Esempio: https://batch-spanner.googleapis.com. Il valore predefinito è: https://batch-spanner.googleapis.com.
  • gcsPubSubSubscription : la sottoscrizione Pub/Sub utilizzata in un criterio di notifica di Cloud Storage. Il nome deve essere nel formato projects/
  • shadowTablePrefix : il prefisso utilizzato per assegnare un nome alle tabelle shadow. Valore predefinito: shadow_.
  • shouldCreateShadowTables : questo flag indica se è necessario creare tabelle shadow nel database Cloud Spanner. Il valore predefinito è: true.
  • rfcStartDateTime : il valore DateTime di inizio utilizzato per il recupero da Cloud Storage (https://tools.ietf.org/html/rfc3339). Il valore predefinito è: 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency : il numero di file DataStream simultanei da leggere. Il valore predefinito è 30.
  • deadLetterQueueDirectory : il percorso del file utilizzato per l'archiviazione dell'output della coda di errori. Il percorso file predefinito è una directory nella posizione temporanea del job Dataflow.
  • dlqRetryMinutes : il numero di minuti tra i nuovi tentativi in una coda di messaggi non recapitabili. Il valore predefinito è 10.
  • dlqMaxRetryCount : il numero massimo di volte in cui è possibile ritentare errori temporanei tramite DLQ. Il valore predefinito è 500.
  • dataStreamRootUrl : URL principale dell'API Datastream. Il valore predefinito è: https://datastream.googleapis.com/.
  • datastreamSourceType : il tipo di database di origine a cui si connette Datastream. Esempio: mysql/oracle. Deve essere impostato durante il test senza un flusso Datastream effettivamente in esecuzione.
  • roundJsonDecimals : se impostato, questo flag arrotonda i valori decimali nelle colonne json a un numero che può essere archiviato senza perdita di precisione. Il valore predefinito è false.
  • runMode : è il tipo di modalità di esecuzione, regolare o con ritentDLQ. Il valore predefinito è: regolare.
  • transformationContextFilePath: percorso del file di contesto di trasformazione in spazio di archiviazione sul cloud utilizzato per compilare i dati utilizzati nelle trasformazioni eseguite durante le migrazioni, ad esempio l'ID shard e il nome del database per identificare il database da cui è stata migrata una riga.
  • directoryWatchDurationInMinutes : la durata per la quale la pipeline deve continuare a eseguire il polling di una directory in GCS. I file Datastreamoutput sono disposti in una struttura di directory che rappresenta il timestamp dell'evento raggruppato per minuti. Questo parametro dovrebbe essere più o meno uguale al ritardo massimo che potrebbe verificarsi tra l'evento che si verifica nel database di origine e lo stesso evento scritto in GCS da parte di Datastream. 99,9 percentile = 10 minuti. Il valore predefinito è 10.
  • spannerPriority : la priorità delle richieste per le chiamate Cloud Spanner. Il valore deve essere uno dei seguenti: [HIGH,MEDIUM,LOW]. Il valore predefinito è HIGH.
  • dlqGcsPubSubSubscription : la sottoscrizione Pub/Sub utilizzata in un criterio di notifica di Cloud Storage per la directory dei nuovi tentativi DLQ durante l'esecuzione in modalità normale. Il nome deve essere nel formato projects/
  • transformationJarPath : posizione del jar personalizzato in Cloud Storage che contiene la logica di trasformazione personalizzata per l'elaborazione dei record nella migrazione in avanti. Il campo predefinito è vuoto.
  • transformationClassName : nome della classe completo con la logica di trasformazione personalizzata. È un campo obbligatorio nel caso in cui transformJarPath sia specificato. Il campo predefinito è vuoto.
  • transformationCustomParameters : stringa contenente eventuali parametri personalizzati da passare alla classe di trasformazione personalizzata. Il campo predefinito è vuoto.
  • filteredEventsDirectory : è il percorso del file per l'archiviazione degli eventi filtrati tramite la trasformazione personalizzata. Il valore predefinito è una directory sotto la località temporanea del job Dataflow. Il valore predefinito è sufficiente nella maggior parte delle condizioni.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. Il valore predefinito è us-central1.

    Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Cloud Datastream to Spanner template.
  6. Inserisci i valori parametro negli appositi campi.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Sostituisci quanto segue:

  • PROJECT_ID: L'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco di tua scelta
  • REGION_NAME: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • GCS_FILE_PATH: il percorso di Cloud Storage utilizzato per archiviare gli eventi di flussi di dati. Ad esempio: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: la tua istanza Spanner.
  • CLOUDSPANNER_DATABASE: il tuo database Spanner.
  • DLQ: il percorso Cloud Storage della directory della coda degli errori.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sul API e i relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: L'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco di tua scelta
  • LOCATION: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • GCS_FILE_PATH: il percorso di Cloud Storage utilizzato per archiviare gli eventi di flussi di dati. Ad esempio: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: la tua istanza Spanner.
  • CLOUDSPANNER_DATABASE: il tuo database Spanner.
  • DLQ: il percorso Cloud Storage della directory della coda degli errori.

Passaggi successivi