Modèle Datastream vers Spanner

Le modèle Datastream vers Spanner est un pipeline de flux de données qui lit les événements Datastream d'un bucket Cloud Storage et les écrit dans une base de données Spanner. Il est destiné à la migration de données de sources Datastream vers Spanner.

Toutes les tables requises pour la migration doivent exister dans la base de données Spanner de destination avant l'exécution du modèle. Par conséquent, la migration du schéma d'une base de données source vers Spanner doit être terminée avant de migrer les données. Des données peuvent exister dans les tables avant la migration. Ce modèle ne propage pas les modifications du schéma Datastream dans la base de données Spanner.

La cohérence des données n'est garantie à la fin de la migration que lorsque toutes les données ont été écrites dans Spanner. Pour stocker des informations de tri pour chaque enregistrement écrit dans Spanner, ce modèle crée une table supplémentaire (appelée "shadow table" ou table fictive) pour chaque table de la base de données Spanner. Cela permet de garantir la cohérence à la fin de la migration. Les tables fictives ne sont pas supprimées après la migration et peuvent être utilisées à des fins de validation à la fin de la migration.

Toutes les erreurs qui se produisent pendant l'opération, telles que les incohérences de schéma, les fichiers JSON non valides ou les erreurs résultant de l'exécution des transformations, sont enregistrées dans une file d'attente d'erreurs. La file d'attente d'erreurs est un dossier Cloud Storage qui stocke tous les événements Datastream ayant rencontré des erreurs ainsi que le motif de l'erreur au format texte. Les erreurs peuvent être temporaires ou permanentes, et sont stockées dans des dossiers Cloud Storage appropriés dans la file d'attente d'erreurs. Les erreurs temporaires font l'objet de nouvelles tentatives automatiques, contrairement aux erreurs permanentes. En cas d'erreurs permanentes, vous avez la possibilité de corriger les événements de modification et de les déplacer vers le bucket pouvant faire l'objet de nouvelles tentatives pendant l'exécution du modèle.

Conditions requises pour ce pipeline

  • Un flux Datastream dans l'état En cours d'exécution ou Non démarré.
  • Un bucket Cloud Storage dans lequel les événements Datastream sont répliqués
  • Une base de données Spanner avec des tables existantes. Ces tables peuvent être vides ou contenir des données.

Paramètres de modèle

Paramètres obligatoires

  • inputFilePattern : emplacement du fichier Cloud Storage contenant les fichiers Datastream à répliquer. Il s'agit généralement du chemin racine d'un flux.
  • instanceId : Instance Spanner dans laquelle les modifications sont répliquées.
  • databaseId : Base de données Spanner dans laquelle les modifications sont répliquées.

Paramètres facultatifs

  • inputFileFormat : format du fichier de sortie généré par Datastream. Par exemple, avro,json. avro par défaut.
  • sessionFilePath : chemin d'accès au fichier de session dans Cloud Storage contenant les informations de mappage de HarbourBridge.
  • projectId : ID du projet Spanner.
  • spannerHost : Point de terminaison Cloud Spanner à appeler dans le modèle. (Exemple : https://batch-spanner.googleapis.com). La valeur par défaut est https://spanner.googleapis.com.
  • streamName : Nom ou modèle du flux à interroger pour obtenir des informations de schéma et le type de source.
  • gcsPubSubSubscription : abonnement Pub/Sub utilisé dans une règle de notification Cloud Storage. Le nom doit être au format projects/.
  • shadowTablePrefix : Préfixe utilisé pour nommer les tables fantômes. Valeur par défaut : shadow_.
  • shouldCreateShadowTables : cet indicateur indique si des tables fictives doivent être créées dans la base de données Cloud Spanner. La valeur par défaut est "true".
  • rfcStartDateTime : date et heure de début utilisées pour récupérer des données depuis Cloud Storage (https://tools.ietf.org/html/rfc3339). Valeur par défaut : 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency : nombre de fichiers DataStream simultanés à lire. La valeur par défaut est 30.
  • deadLetterQueueDirectory : chemin d'accès au fichier utilisé pour stocker la sortie de la file d'attente d'erreurs. Le chemin d'accès par défaut est un répertoire situé sous l'emplacement temporaire du job Dataflow.
  • dlqRetryMinutes : Nombre de minutes entre les tentatives d'exécution de la file d'attente de lettres mortes. Valeur par défaut : 10
  • dlqMaxRetryCount : nombre maximal de tentatives d'exécution d'erreurs temporaires via la file d'attente de lettres mortes. Valeur par défaut : 500.
  • dataStreamRootUrl : URL racine de l'API Datastream. La valeur par défaut est https://datastream.googleapis.com/.
  • datastreamSourceType : type de base de données source auquel Datastream se connecte. Exemple : mysql/oracle. Doit être défini lors des tests sans que Datastream ne soit en cours d'exécution.
  • roundJsonDecimals : si cet indicateur est défini, il arrondit les valeurs décimales des colonnes JSON à un nombre pouvant être stocké sans perte de précision. La valeur par défaut est "false".
  • runMode : type de mode d'exécution, qu'il soit standard ou avec retryDLQ. Valeur par défaut : "regular".
  • transformationContextFilePath : chemin d'accès au fichier de contexte de transformation dans le stockage cloud utilisé pour renseigner les données utilisées dans les transformations effectuées lors des migrations. Exemple : ID du segment vers le nom de la base de données pour identifier la base de données à partir de laquelle une ligne a été migrée.
  • directoryWatchDurationInMinutes : durée pendant laquelle le pipeline doit continuer à interroger un répertoire dans GCS. Les fichiers de sortie Datastream sont organisés dans une structure de répertoires qui représente l'horodatage de l'événement, regroupé par minute. Ce paramètre doit être approximativement égal au délai maximal pouvant survenir entre un événement dans la base de données source et le même événement écrit dans GCS par Datastream. 99,9 centile = 10 minutes. La valeur par défaut est 10.
  • spannerPriority : Priorité des requêtes pour les appels Cloud Spanner. La valeur doit être l'une des suivantes : [HIGH,MEDIUM,LOW]. La valeur par défaut est "HIGH".
  • dlqGcsPubSubSubscription : abonnement Pub/Sub utilisé dans une règle de notification Cloud Storage pour le répertoire de nouvelle tentative DLQ en mode normal. Le nom doit être au format projects/.
  • transformationJarPath : emplacement du fichier JAR personnalisé dans Cloud Storage contenant la logique de transformation personnalisée pour le traitement des enregistrements lors de la migration vers l'avant. La valeur par défaut est vide.
  • transformationClassName : Nom de classe complet avec une logique de transformation personnalisée. Ce champ est obligatoire si la valeur de transformationJarPath est spécifiée. La valeur par défaut est vide.
  • transformationCustomParameters : Chaîne contenant les paramètres personnalisés à transmettre à la classe de transformation personnalisée. La valeur par défaut est vide.
  • filteredEventsDirectory : chemin d'accès au fichier permettant de stocker les événements filtrés via une transformation personnalisée. La valeur par défaut est un répertoire situé sous l'emplacement temporaire de la tâche Dataflow. La valeur par défaut est suffisante dans la plupart des conditions.
  • shardingContextFilePath : chemin d'accès au fichier de contexte de fractionnement dans le stockage cloud utilisé pour renseigner l'ID de fractionnement lors des migrations. Il doit être au format Map<stream_name, Map<db_name, shard_id>>.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Datastream to Spanner template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • GCS_FILE_PATH : chemin d'accès Cloud Storage utilisé pour stocker les événements de flux de données. Par exemple : gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE : votre instance Spanner.
  • CLOUDSPANNER_DATABASE : votre base de données Spanner.
  • DLQ : chemin d'accès Cloud Storage au répertoire de la file d'attente d'erreurs.

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • GCS_FILE_PATH : chemin d'accès Cloud Storage utilisé pour stocker les événements de flux de données. Par exemple : gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE : votre instance Spanner.
  • CLOUDSPANNER_DATABASE : votre base de données Spanner.
  • DLQ : chemin d'accès Cloud Storage au répertoire de la file d'attente d'erreurs.

Étapes suivantes