Da Datastream a modello Spanner

Il modello Da Datastream a Spanner è una pipeline di inserimento flussi che legge gli eventi Datastream da un bucket Cloud Storage e li scrive in un database Spanner. È destinato alla migrazione dei dati dalle origini Datastream a Spanner.

Tutte le tabelle necessarie per la migrazione devono esistere nel database Spanner di destinazione prima dell'esecuzione del modello. Di conseguenza, la migrazione dello schema da un database di origine a Spanner di destinazione deve essere completata prima della migrazione dei dati. I dati possono esistere nelle tabelle prima della migrazione. Questo modello non propaga le modifiche dello schema Datastream al database Spanner.

La coerenza dei dati è garantita solo al termine della migrazione, quando tutti i dati sono stati scritti in Spanner. Per archiviare le informazioni sugli ordini per ogni record scritto in Spanner, questo modello crea una tabella aggiuntiva (chiamata tabella shadow) per ogni tabella nel database Spanner. Questo viene utilizzato per garantire la coerenza alla fine della migrazione. Le tabelle shadow non vengono eliminate dopo la migrazione e possono essere utilizzate per la convalida alla fine della migrazione.

Eventuali errori che si verificano durante l'operazione, come mancata corrispondenza dello schema, file JSON non corretti o errori derivanti dall'esecuzione di trasformazioni, vengono registrati in una coda di errori. La coda degli errori è una cartella di Cloud Storage in cui sono archiviati tutti gli eventi Datastream che hanno riscontrato errori insieme al motivo dell'errore in formato di testo. Gli errori possono essere temporanei o permanenti e vengono archiviati nelle cartelle Cloud Storage appropriate nella coda degli errori. Gli errori temporanei vengono riprovati automaticamente, mentre quelli permanenti non lo sono. In caso di errori permanenti, hai la possibilità di apportare correzioni agli eventi di modifica e di spostarli nel bucket ripristinabile mentre il modello è in esecuzione.

Requisiti della pipeline

  • Uno stream Datastream in stato In esecuzione o Non avviato.
  • Un bucket Cloud Storage in cui vengono replicati gli eventi Datastream.
  • Un database Spanner con tabelle esistenti. Queste tabelle possono essere vuote o contenere dati.

Parametri del modello

Parametro Descrizione
inputFilePattern La posizione dei file Datastream in Cloud Storage da replicare. In genere, si tratta del percorso principale di un flusso.
streamName Il nome o il modello del flusso per il polling delle informazioni sullo schema e del tipo di origine.
instanceId L'istanza Spanner in cui le modifiche vengono replicate.
databaseId Il database Spanner in cui le modifiche vengono replicate.
projectId L'ID progetto Spanner.
deadLetterQueueDirectory (Facoltativo) Questo è il percorso del file in cui archiviare l'output della coda di errori. Il valore predefinito è una directory sotto la località temporanea del job Dataflow.
inputFileFormat (Facoltativo) Il formato del file di output prodotto da Datastream. Ad esempio avro,json. Valore predefinito: avro.
shadowTablePrefix (Facoltativo) Il prefisso utilizzato per assegnare un nome alle tabelle shadow. Valore predefinito: shadow_.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, vedi Località Dataflow.

  5. Nel menu a discesa Modello Dataflow, seleziona the Cloud Datastream to Spanner template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • GCS_FILE_PATH: il percorso Cloud Storage utilizzato per archiviare gli eventi di flussi di dati. Ad esempio: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: la tua istanza di Spanner.
  • CLOUDSPANNER_DATABASE: il tuo database Spanner.
  • DLQ: il percorso Cloud Storage per la directory della coda di errori.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • GCS_FILE_PATH: il percorso Cloud Storage utilizzato per archiviare gli eventi di flussi di dati. Ad esempio: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: la tua istanza di Spanner.
  • CLOUDSPANNER_DATABASE: il tuo database Spanner.
  • DLQ: il percorso Cloud Storage per la directory della coda di errori.

Passaggi successivi