Modèle Datastream vers Spanner

Le modèle Datastream vers Spanner est un pipeline de flux de données qui lit les événements Datastream d'un bucket Cloud Storage et les écrit dans une base de données Spanner. Il est destiné à la migration de données de sources Datastream vers Spanner.

Toutes les tables requises pour la migration doivent exister dans la base de données Spanner de destination avant l'exécution du modèle. Par conséquent, la migration du schéma d'une base de données source vers Spanner doit être terminée avant de migrer les données. Des données peuvent exister dans les tables avant la migration. Ce modèle ne propage pas les modifications du schéma Datastream dans la base de données Spanner.

La cohérence des données n'est garantie à la fin de la migration que lorsque toutes les données ont été écrites dans Spanner. Pour stocker des informations de tri pour chaque enregistrement écrit dans Spanner, ce modèle crée une table supplémentaire (appelée "shadow table" ou table fictive) pour chaque table de la base de données Spanner. Cela permet de garantir la cohérence à la fin de la migration. Les tables fictives ne sont pas supprimées après la migration et peuvent être utilisées à des fins de validation à la fin de la migration.

Toutes les erreurs qui se produisent pendant l'opération, telles que les incohérences de schéma, les fichiers JSON non valides ou les erreurs résultant de l'exécution des transformations, sont enregistrées dans une file d'attente d'erreurs. La file d'attente d'erreurs est un dossier Cloud Storage qui stocke tous les événements Datastream ayant rencontré des erreurs ainsi que le motif de l'erreur au format texte. Les erreurs peuvent être temporaires ou permanentes, et sont stockées dans des dossiers Cloud Storage appropriés dans la file d'attente d'erreurs. Les erreurs temporaires font l'objet de nouvelles tentatives automatiques, contrairement aux erreurs permanentes. En cas d'erreurs permanentes, vous avez la possibilité de corriger les événements de modification et de les déplacer vers le bucket pouvant faire l'objet de nouvelles tentatives pendant l'exécution du modèle.

Conditions requises pour ce pipeline

  • Un flux Datastream dans l'état En cours d'exécution ou Non démarré.
  • Un bucket Cloud Storage dans lequel les événements Datastream sont répliqués
  • Une base de données Spanner avec des tables existantes. Ces tables peuvent être vides ou contenir des données.

Paramètres de modèle

Paramètres obligatoires

  • instanceId: instance Spanner dans laquelle les modifications sont répliquées.
  • databaseId: base de données Spanner dans laquelle les modifications sont répliquées.

Paramètres facultatifs

  • inputFilePattern: emplacement du fichier Cloud Storage contenant les fichiers Datastream à répliquer. Il s'agit généralement du chemin racine d'un flux. Cette fonctionnalité n'est plus prise en charge.
  • inputFileFormat: format du fichier de sortie généré par Datastream. Par exemple, avro,json. La valeur par défaut est avro.
  • sessionFilePath: chemin d'accès au fichier de session dans Cloud Storage contenant les informations de mappage de HarbourBridge.
  • projectId: ID du projet Spanner.
  • spannerHost: point de terminaison Cloud Spanner à appeler dans le modèle. Exemple :https://batch-spanner.googleapis.com La valeur par défaut est https://spanner.googleapis.com.
  • gcsPubSubSubscription: abonnement Pub/Sub utilisé dans une règle de notification Cloud Storage. Pour le nom, utilisez le format projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • streamName: nom ou modèle du flux à interroger pour obtenir des informations de schéma et le type de source.
  • shadowTablePrefix: préfixe utilisé pour nommer les tables fantômes. Valeur par défaut : shadow_.
  • shouldCreateShadowTables: cette option indique si des tables fantômes doivent être créées dans la base de données Cloud Spanner. La valeur par défaut est "true".
  • rfcStartDateTime: date et heure de début utilisées pour récupérer des données depuis Cloud Storage (https://tools.ietf.org/html/rfc3339). Valeur par défaut : 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency: nombre de fichiers DataStream simultanés à lire. La valeur par défaut est 30.
  • deadLetterQueueDirectory: chemin d'accès au fichier utilisé pour stocker la sortie de la file d'attente d'erreurs. Le chemin d'accès par défaut est un répertoire situé sous l'emplacement temporaire du job Dataflow.
  • dlqRetryMinutes: nombre de minutes entre les tentatives d'exécution de la file d'attente de lettres mortes. La valeur par défaut est 10.
  • dlqMaxRetryCount: nombre maximal de nouvelles tentatives en raison d'erreurs temporaires via la file d'attente de lettres mortes. La valeur par défaut est 500.
  • dataStreamRootUrl: URL racine de l'API Datastream. La valeur par défaut est https://datastream.googleapis.com/.
  • datastreamSourceType: type de base de données source auquel Datastream se connecte. Exemple : mysql/oracle. Doit être défini lors des tests sans que Datastream ne soit en cours d'exécution.
  • roundJsonDecimals: si cette option est définie, elle arrondit les valeurs décimales des colonnes JSON à un nombre pouvant être stocké sans perte de précision. La valeur par défaut est "false".
  • runMode: type de mode d'exécution, qu'il soit standard ou avec nouvelle tentative de la file d'attente de lettres mortes. La valeur par défaut est "regular".
  • transformationContextFilePath: chemin d'accès au fichier de contexte de transformation dans le stockage cloud utilisé pour renseigner les données utilisées dans les transformations effectuées lors des migrations. Exemple: ID du segment vers le nom de la base de données pour identifier la base de données à partir de laquelle une ligne a été migrée.
  • directoryWatchDurationInMinutes: durée pendant laquelle le pipeline doit continuer à interroger un répertoire dans GCS. Les fichiers de sortie Datastream sont organisés dans une structure de répertoires qui représente le code temporel de l'événement, regroupé par minute. Ce paramètre doit être approximativement égal au délai maximal pouvant s'écouler entre l'événement qui se produit dans la base de données source et l'écriture du même événement dans GCS par Datastream. 99,9 centile = 10 minutes. La valeur par défaut est 10.
  • spannerPriority: priorité des requêtes pour les appels Cloud Spanner. La valeur doit être l'une des suivantes: [HIGH,MEDIUM,LOW]. La valeur par défaut est HIGH.
  • dlqGcsPubSubSubscription: abonnement Pub/Sub utilisé dans une règle de notification Cloud Storage pour le répertoire de nouvelle tentative de la file d'attente de lettres mortes en mode normal. Pour le nom, utilisez le format projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>. Lorsqu'ils sont définis, les paramètres deadLetterQueueDirectory et dlqRetryMinutes sont ignorés.
  • transformationJarPath: emplacement du fichier JAR personnalisé dans Cloud Storage contenant la logique de transformation personnalisée pour le traitement des enregistrements lors de la migration. La valeur par défaut est vide.
  • transformationClassName: nom de classe complet avec une logique de transformation personnalisée. Ce champ est obligatoire si la valeur de transformationJarPath est spécifiée. La valeur par défaut est vide.
  • transformationCustomParameters: chaîne contenant les paramètres personnalisés à transmettre à la classe de transformation personnalisée. La valeur par défaut est vide.
  • filteredEventsDirectory: chemin d'accès au fichier permettant de stocker les événements filtrés via une transformation personnalisée. La valeur par défaut est un répertoire situé sous l'emplacement temporaire de la tâche Dataflow. La valeur par défaut est suffisante dans la plupart des conditions.
  • shardingContextFilePath: chemin d'accès au fichier de contexte de segmentation dans le stockage cloud utilisé pour renseigner l'ID de segment dans la base de données Spanner pour chaque segment source.Il doit être au format Map<stream_name, Map<db_name, shard_id>>.
  • tableOverrides: il s'agit des forçages de nom de table de la source vers Spanner. Elles sont écrites au format suivant: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]. Cet exemple montre comment mapper la table "Singers" sur "Vocalists" et la table "Albums" sur "Records". Exemple :[{Singers, Vocalists}, {Albums, Records}] La valeur par défaut est vide.
  • columnOverrides: il s'agit des forçages de nom de colonne de la source au spanner. Elles sont écrites au format suivant: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]Notez que le nom de table source doit rester le même dans la paire source et spanner. Pour remplacer les noms de table, utilisez tableOverrides.L'exemple montre comment mapper SingerName à TalentName et AlbumName à RecordName dans les tables Singers et Albums, respectivement. Exemple :[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}] La valeur par défaut est vide.
  • schemaOverridesFilePath: fichier qui spécifie les forçages de nom de table et de colonne de la source vers Spanner. La valeur par défaut est vide.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Datastream to Spanner template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • GCS_FILE_PATH : chemin d'accès Cloud Storage utilisé pour stocker les événements de flux de données. Par exemple : gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE : votre instance Spanner.
  • CLOUDSPANNER_DATABASE : votre base de données Spanner.
  • DLQ : chemin d'accès Cloud Storage au répertoire de la file d'attente d'erreurs.

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • GCS_FILE_PATH : chemin d'accès Cloud Storage utilisé pour stocker les événements de flux de données. Par exemple : gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE : votre instance Spanner.
  • CLOUDSPANNER_DATABASE : votre base de données Spanner.
  • DLQ : chemin d'accès Cloud Storage au répertoire de la file d'attente d'erreurs.

Étape suivante