Modèle Datastream vers Spanner

Le modèle Datastream vers Spanner est un pipeline de flux de données qui lit les événements Datastream d'un bucket Cloud Storage et les écrit dans une base de données Spanner. Il est destiné à la migration de données de sources Datastream vers Spanner.

Toutes les tables requises pour la migration doivent exister dans la base de données Spanner de destination avant l'exécution du modèle. Par conséquent, la migration du schéma d'une base de données source vers Spanner doit être terminée avant de migrer les données. Des données peuvent exister dans les tables avant la migration. Ce modèle ne propage pas les modifications du schéma Datastream dans la base de données Spanner.

La cohérence des données n'est garantie à la fin de la migration que lorsque toutes les données ont été écrites dans Spanner. Pour stocker des informations de tri pour chaque enregistrement écrit dans Spanner, ce modèle crée une table supplémentaire (appelée "shadow table" ou table fictive) pour chaque table de la base de données Spanner. Cela permet de garantir la cohérence à la fin de la migration. Les tables fictives ne sont pas supprimées après la migration et peuvent être utilisées à des fins de validation à la fin de la migration.

Toutes les erreurs qui se produisent pendant l'opération, telles que les incohérences de schéma, les fichiers JSON non valides ou les erreurs résultant de l'exécution des transformations, sont enregistrées dans une file d'attente d'erreurs. La file d'attente d'erreurs est un dossier Cloud Storage qui stocke tous les événements Datastream ayant rencontré des erreurs ainsi que le motif de l'erreur au format texte. Les erreurs peuvent être temporaires ou permanentes, et sont stockées dans des dossiers Cloud Storage appropriés dans la file d'attente d'erreurs. Les erreurs temporaires font l'objet de nouvelles tentatives automatiques, contrairement aux erreurs permanentes. En cas d'erreurs permanentes, vous avez la possibilité de corriger les événements de modification et de les déplacer vers le bucket pouvant faire l'objet de nouvelles tentatives pendant l'exécution du modèle.

Conditions requises pour ce pipeline

  • Un flux Datastream dans l'état En cours d'exécution ou Non démarré.
  • Un bucket Cloud Storage dans lequel les événements Datastream sont répliqués
  • Une base de données Spanner avec des tables existantes. Ces tables peuvent être vides ou contenir des données.

Paramètres de modèle

Paramètres Description
inputFilePattern Emplacement des fichiers Datastream à répliquer dans Cloud Storage. Il s'agit généralement du chemin racine d'un flux.
streamName Nom ou modèle du flux à interroger pour obtenir des informations de schéma et le type de source.
instanceId Instance Spanner dans laquelle les modifications sont répliquées.
databaseId Base de données Spanner dans laquelle les modifications sont répliquées.
projectId ID du projet Spanner.
deadLetterQueueDirectory (Facultatif) Il s'agit du chemin d'accès au fichier permettant de stocker la sortie de la file d'attente d'erreurs. La valeur par défaut est un répertoire situé sous l'emplacement temporaire de la tâche Dataflow.
inputFileFormat (Facultatif) Format du fichier de sortie généré par Datastream. Par exemple, avro,json. Par défaut, avro.
shadowTablePrefix (Facultatif) Préfixe utilisé pour nommer les tables fantômes. Valeur par défaut : shadow_

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Datastream to Spanner template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • GCS_FILE_PATH : chemin d'accès Cloud Storage utilisé pour stocker les événements de flux de données. Par exemple : gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE : votre instance Spanner.
  • CLOUDSPANNER_DATABASE : votre base de données Spanner.
  • DLQ : chemin d'accès Cloud Storage au répertoire de la file d'attente d'erreurs.

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • GCS_FILE_PATH : chemin d'accès Cloud Storage utilisé pour stocker les événements de flux de données. Par exemple : gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE : votre instance Spanner.
  • CLOUDSPANNER_DATABASE : votre base de données Spanner.
  • DLQ : chemin d'accès Cloud Storage au répertoire de la file d'attente d'erreurs.

Étapes suivantes