Modello Datastream to Spanner

Il modello Datastream to Spanner è una pipeline di elaborazione di flussi di dati che legge gli eventi Datastream da un bucket Cloud Storage e li scrive in un database Spanner. È destinato alla migrazione dei dati dalle origini Datastream a Spanner.

Tutte le tabelle necessarie per la migrazione devono esistere nel database Spanner di destinazione prima dell'esecuzione del modello. Pertanto, la migrazione dello schema da un database di origine a Spanner di destinazione deve essere completata prima della migrazione dei dati. I dati possono esistere nelle tabelle prima della migrazione. Questo modello non propaga le modifiche dello schema di Datastream al database Spanner.

La coerenza dei dati è garantita solo al termine della migrazione, quando tutti i dati sono stati scritti in Spanner. Per memorizzare le informazioni sull'ordinamento per ogni record scritto in Spanner, questo modello crea una tabella aggiuntiva (chiamata tabella shadow) per ogni tabella nel database Spanner. Viene utilizzato per garantire la coerenza al termine della migrazione. Le tabelle shadow non vengono eliminate dopo la migrazione e possono essere utilizzate a scopo di convalida al termine della migrazione.

Eventuali errori che si verificano durante il funzionamento, ad esempio mancate corrispondenze dello schema, file JSON con formato errato o errori derivanti dall'esecuzione delle trasformazioni, vengono registrati in una coda di errori. La coda di errori è una cartella Cloud Storage che memorizza tutti gli eventi Datastream che hanno riscontrato errori, nonché il motivo dell'errore in formato di testo. Gli errori possono essere temporanei o permanenti e vengono archiviati nelle cartelle Cloud Storage appropriate nella coda di errori. Per gli errori temporanei viene eseguito automaticamente un nuovo tentativo, mentre per quelli permanenti no. In caso di errori permanenti, hai la possibilità di apportare correzioni agli eventi di modifica e spostarli nel bucket recuperabile durante l'esecuzione del modello.

Requisiti della pipeline

  • Uno stream Datastream in stato In esecuzione o Non avviato.
  • Un bucket Cloud Storage in cui vengono replicati gli eventi DataStream.
  • Un database Spanner con tabelle esistenti. Queste tabelle possono essere vuote o contenere dati.

Parametri del modello

Parametri obbligatori

  • instanceId: l'istanza Spanner in cui vengono replicate le modifiche.
  • databaseId: il database Spanner in cui vengono replicate le modifiche.
  • streamName: il nome o il modello dello stream da sottoporre a polling per informazioni sullo schema e sul tipo di origine.

Parametri facoltativi

  • inputFilePattern: la posizione del file Cloud Storage contenente i file Datastream da replicare. In genere, si tratta del percorso principale di uno stream. Il supporto di questa funzionalità è stato disattivato.
  • inputFileFormat: il formato del file di output prodotto da Datastream. Ad esempio avro,json. Il valore predefinito è avro.
  • sessionFilePath: percorso del file di sessione in Cloud Storage contenente le informazioni di mappatura di HarbourBridge.
  • projectId: l'ID progetto Spanner.
  • spannerHost: l'endpoint Cloud Spanner da chiamare nel modello. Ad esempio, https://batch-spanner.googleapis.com. Valore predefinito: https://batch-spanner.googleapis.com.
  • gcsPubSubSubscription: l'abbonamento Pub/Sub utilizzato in un criterio di notifica di Cloud Storage. Per il nome, utilizza il formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • shadowTablePrefix: il prefisso utilizzato per assegnare un nome alle tabelle shadow. Valore predefinito: shadow_.
  • shouldCreateShadowTables: questo flag indica se devono essere create tabelle shadow nel database Cloud Spanner. Il valore predefinito è true.
  • rfcStartDateTime: la data e l'ora di inizio utilizzate per il recupero da Cloud Storage (https://tools.ietf.org/html/rfc3339). Il valore predefinito è 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency: il numero di file DataStream simultanei da leggere. Il valore predefinito è 30.
  • deadLetterQueueDirectory: il percorso del file utilizzato per archiviare l'output della coda di errori. Il percorso del file predefinito è una directory nella posizione temporanea del job Dataflow.
  • dlqRetryMinutes: il numero di minuti tra le riarmi della coda messaggi non recapitabili. Il valore predefinito è 10.
  • dlqMaxRetryCount: il numero massimo di volte in cui è possibile riprovare a gestire gli errori temporanei tramite la coda DLQ. Il valore predefinito è 500.
  • dataStreamRootUrl: URL principale dell'API Datastream. Valore predefinito: https://datastream.googleapis.com/.
  • datastreamSourceType: il tipo di database di origine a cui si connette Datastream. Esempio: mysql/oracle. Deve essere impostato durante i test senza un Datastream effettivamente in esecuzione.
  • roundJsonDecimals: se impostato, questo flag arrotonderà i valori decimali nelle colonne JSON a un numero che può essere memorizzato senza perdita di precisione. Il valore predefinito è false.
  • runMode: il tipo di modalità di esecuzione, normale o con retryDLQ. Il valore predefinito è: normale.
  • transformationContextFilePath: percorso del file del contesto di trasformazione in spazio di archiviazione sul cloud utilizzato per compilare i dati utilizzati nelle trasformazioni eseguite durante le migrazioni. Ad esempio, l'ID shard al nome del database per identificare il database da cui è stata eseguita la migrazione di una riga.
  • directoryWatchDurationInMinutes: la durata per cui la pipeline deve continuare a eseguire il polling di una directory in GCS. I file di output del flusso di dati sono organizzati in una struttura di directory che mostra il timestamp dell'evento raggruppato per minuti. Questo parametro deve essere approssimativamente uguale al ritardo massimo che potrebbe verificarsi tra l'evento che si verifica nel database di origine e lo stesso evento scritto in GCS da Datastream. Percentile 99,9 = 10 minuti. Il valore predefinito è 10.
  • spannerPriority: la priorità della richiesta per le chiamate Cloud Spanner. Il valore deve essere uno dei seguenti: [HIGH,MEDIUM,LOW]. Il valore predefinito è HIGH.
  • dlqGcsPubSubSubscription: l'abbonamento Pub/Sub utilizzato in un criterio di notifica Cloud Storage per la directory di ripetizione della coda DLQ quando viene eseguito in modalità normale. Per il nome, utilizza il formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>. Se impostato, i valori deadLetterQueueDirectory e dlqRetryMinutes vengono ignorati.
  • transformationJarPath: percorso del file JAR personalizzato in Cloud Storage per il file contenente la logica di trasformazione personalizzata per l'elaborazione dei record nella migrazione in avanti. Il valore predefinito è vuoto.
  • transformationClassName: il nome completo della classe con la logica di trasformazione personalizzata. È un campo obbligatorio se è specificato transformationJarPath. Il valore predefinito è vuoto.
  • transformationCustomParameters: stringa contenente eventuali parametri personalizzati da passare alla classe di trasformazione personalizzata. Il valore predefinito è vuoto.
  • filteredEventsDirectory: il percorso del file in cui memorizzare gli eventi filtrati tramite la trasformazione personalizzata. Il valore predefinito è una directory nella posizione temporanea del job Dataflow. Il valore predefinito è sufficiente nella maggior parte delle condizioni.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Cloud Datastream to Spanner template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • GCS_FILE_PATH: il percorso di Cloud Storage utilizzato per archiviare gli eventi del flusso di dati. Ad esempio: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: la tua istanza Spanner.
  • CLOUDSPANNER_DATABASE: il tuo database Spanner.
  • DLQ: il percorso di Cloud Storage per la directory della coda di errori.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • GCS_FILE_PATH: il percorso di Cloud Storage utilizzato per archiviare gli eventi del flusso di dati. Ad esempio: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: la tua istanza Spanner.
  • CLOUDSPANNER_DATABASE: il tuo database Spanner.
  • DLQ: il percorso di Cloud Storage per la directory della coda di errori.

Passaggi successivi