Modèle de flux de modifications Spanner vers BigQuery

Le modèle de flux de modification Spanner vers BigQuery est un pipeline de streaming qui diffuse les enregistrements de modification des données Spanner et les écrit dans les tables BigQuery à l'aide de l'exécuteur Dataflow V2.

Toutes les colonnes du flux de modifications surveillées sont incluses dans chaque ligne de la table BigQuery, qu'elles soient modifiées ou non par une transaction Spanner. Les colonnes non surveillées ne sont pas incluses dans la ligne BigQuery. Toute modification de Spanner inférieure au filigrane Dataflow est appliquée aux tables BigQuery ou est stockée dans la file d'attente de lettres mortes pour nouvelle tentative. Les lignes BigQuery sont insérées dans le désordre par rapport à l'ordre d'horodatage de commit Spanner d'origine.

Si les tables BigQuery nécessaires n'existent pas, le pipeline les crée. Sinon, vous utilisez des tables BigQuery existantes. Le schéma des tables BigQuery existantes doit contenir les colonnes suivies correspondantes des tables Spanner et toutes les colonnes de métadonnées supplémentaires qui ne sont pas ignorées explicitement par l'option ignoreFields. La description des champs de métadonnées se trouve dans la liste suivante. Chaque nouvelle ligne BigQuery inclut toutes les colonnes surveillées par le flux de modification de la ligne correspondante dans la table Spanner à l'horodatage de l'enregistrement de modification.

Les champs de métadonnées suivants sont ajoutés aux tables BigQuery. Pour en savoir plus sur ces champs, consultez la section Enregistrements de modifications des données dans la page "Modifier les partitions, les enregistrements et les requêtes de flux".

  • _metadata_spanner_mod_type : type de modification (insertion, mise à jour ou suppression) de la transaction Spanner. Extrait de l'enregistrement de modification des données de flux de modifications.
  • _metadata_spanner_table_name: nom de la table Spanner. Ce champ n'est pas le nom de la table de métadonnées du connecteur.
  • _metadata_spanner_commit_timestamp : horodatage de commit de Spanner, qui correspond à l'heure à laquelle une modification est validée. Cette valeur est extraite de l'enregistrement de modification des données de flux de modification.
  • _metadata_spanner_server_transaction_id : chaîne unique représentant la transactionSpanner dans laquelle la modification a été validée. N'utilisez cette valeur que dans le contexte du traitement des enregistrements de flux de modifications. Elle n'est pas corrélée avec l'ID de transaction dans l'API de Spanner. Cette valeur est extraite de l'enregistrement de modification des données de flux de modification.
  • _metadata_spanner_record_sequence : numéro de séquence de l'enregistrement dans la transaction Spanner. Les numéros de séquence sont garantis uniques et augmentent de façon monotone, mais ne sont pas nécessairement contigus, dans une transaction. Cette valeur est extraite de l'enregistrement de modification des données de flux de modification.
  • _metadata_spanner_is_last_record_in_transaction_in_partition : indique si l'enregistrement est le dernier enregistrement d'une transaction Spanner dans la partition actuelle. Cette valeur est extraite de l'enregistrement de modification des données de flux de modification.
  • _metadata_spanner_number_of_records_in_transaction : nombre d'enregistrements de modifications de données faisant partie de la transaction Spanner dans toutes les partitions de flux de modifications. Cette valeur est extraite de l'enregistrement de modification des données de flux de modification.
  • _metadata_spanner_number_of_partitions_in_transaction : nombre de Partitions qui renvoient des enregistrements de modification de données pour la transaction Spanner. Cette valeur est extraite de l'enregistrement de modification des données de flux de modification.
  • _metadata_big_query_commit_timestamp : horodatage de commit correspondant à l'insertion de la ligne dans BigQuery. Si la valeur de useStorageWriteApi est true, cette colonne n'est pas automatiquement créée dans la table des journaux de modifications par le pipeline. Dans ce cas, vous devez ajouter manuellement cette colonne dans la table du journal des modifications si nécessaire.

Lorsque vous utilisez ce modèle, tenez compte des détails suivants:

  • Vous pouvez utiliser ce modèle pour propager de nouvelles colonnes dans des tables existantes ou de nouvelles tables de Spanner vers BigQuery. Pour en savoir plus, consultez Gérer l'ajout de tables ou de colonnes de suivi.
  • Pour les types de capture de valeur OLD_AND_NEW_VALUES et NEW_VALUES, lorsque l'enregistrement de modification de données contient une modification UPDATE, le modèle doit effectuer une lecture non actualisée dans Spanner à l'horodatage de validation de l'enregistrement de modification de données, afin de récupérer les colonnes non modifiées mais surveillées. Veillez à configurer correctement la "version_retention_period" de votre base de données pour la lecture non actualisée. Pour le type de capture de valeur NEW_ROW, le modèle est plus efficace, car l'enregistrement de modification de données capture la nouvelle ligne complète, y compris les colonnes qui ne sont pas mises à jour dans les requêtes UPDATE, et le modèle n'a pas besoin d'effectuer une lecture non actualisée.
  • Pour minimiser la latence et les coûts de transport du réseau, exécutez la tâche Dataflow à partir de la même région que votre instance Spanner ou vos tables BigQuery. Si vous utilisez des sources, des récepteurs, des emplacements de fichiers de préproduction ou des emplacements de fichiers temporaires situés en dehors de la région associée à votre tâche, vos données peuvent être envoyées d'une région à l'autre. Pour en savoir plus, consultez la page Régions Dataflow.
  • Ce modèle accepte tous les types de données Spanner valides, mais si le type BigQuery est plus précis que le type Spanner, la transformation peut occasionner une perte de précision. Plus précisément :
    • Pour le type JSON de Spanner, les membres d'un objet sont ordonnés de façon lexicographique, mais il n'existe aucune garantie similaire pour le type JSON de BigQuery.
    • Spanner accepte le type TIMESTAMP en nanosecondes, mais BigQuery n'accepte le type TIMESTAMP qu'en microsecondes.
  • Ce modèle ne permet pas d'utiliser l' API BigQuery Storage Write en mode "exactement une fois".

En savoir plus sur les flux de modification, la création de pipelines Dataflow de flux de modification et les bonnes pratiques.

Conditions requises pour ce pipeline

  • L'instance Spanner doit exister avant l'exécution du pipeline.
  • La base de données Spanner doit exister avant l'exécution du pipeline.
  • L'instance de métadonnées Spanner doit exister avant l'exécution du pipeline.
  • La base de données de métadonnées Spanner doit exister avant l'exécution du pipeline.
  • Le flux de modifications Spanner doit exister avant l'exécution du pipeline.
  • L'ensemble de données BigQuery doit exister avant l'exécution du pipeline.

Gérer l'ajout de tables ou de colonnes de suivi

Cette section décrit les bonnes pratiques à suivre pour gérer l'ajout de tables et de colonnes de suivi Spanner pendant l'exécution du pipeline.

  • Avant d'ajouter une colonne à un champ d'application de flux de modification Spanner, ajoutez-la d'abord à la table de journal des modifications BigQuery. La colonne ajoutée doit avoir un type de données correspondant et être NULLABLE. Patientez au moins 10 minutes avant de continuer à créer la nouvelle colonne ou la nouvelle table dans Spanner. L'écriture dans la nouvelle colonne sans attendre peut entraîner un enregistrement non traité avec un code d'erreur non valide dans le répertoire de la file d'attente de lettres mortes.
  • Pour ajouter une table, commencez par l'ajouter dans la base de données Spanner. La table est automatiquement créée dans BigQuery lorsque le pipeline reçoit un enregistrement pour la nouvelle table.
  • Après avoir ajouté les nouvelles colonnes ou tables dans la base de données Spanner, veillez à modifier votre flux de modifications pour suivre les nouvelles colonnes ou tables souhaitées si elles ne sont pas déjà suivies implicitement.
  • Le modèle ne supprime pas les tables ni les colonnes de BigQuery. Si une colonne est supprimée de la table Spanner, des valeurs nulles sont insérées dans les colonnes de journal des modifications BigQuery pour les enregistrements générés après la suppression des colonnes de la table Spanner, sauf si vous supprimez manuellement la colonne de BigQuery.
  • Le modèle n'est pas compatible avec les mises à jour du type de colonne. Bien que Spanner permette de convertir une colonne STRING en BYTES ou une colonne BYTES en STRING, vous ne pouvez pas modifier le type de données d'une colonne existante ni utiliser le même nom de colonne avec différents types de données dans BigQuery. Si vous supprimez et recréez une colonne avec le même nom, mais un type différent dans Spanner, les données peuvent être écrites dans la colonne BigQuery existante, mais le type reste inchangé.
  • Ce modèle n'est pas compatible avec les mises à jour du mode de colonne. Les colonnes de métadonnées répliquées dans BigQuery sont définies sur le mode REQUIRED. Toutes les autres colonnes répliquées dans BigQuery sont définies sur NULLABLE, qu'elles soient définies comme NOT NULL dans la table Spanner ou non. Vous ne pouvez pas mettre à jour les colonnes NULLABLE en mode REQUIRED dans BigQuery.
  • Il n'est pas possible de modifier le type de capture de valeur d'un flux de modification pour les pipelines en cours d'exécution.

Paramètres de modèle

Paramètres obligatoires

  • spannerInstanceId : Instance Spanner à partir de laquelle lire les flux de modifications.
  • spannerDatabase : Base de données Spanner à partir de laquelle lire les flux de modifications.
  • spannerMetadataInstanceId : Instance Spanner à utiliser pour la table de métadonnées du connecteur de flux de modification.
  • spannerMetadataDatabase : Base de données Spanner à utiliser pour la table de métadonnées du connecteur de flux de modification.
  • spannerChangeStreamName : Nom du flux de modifications Spanner à lire.
  • bigQueryDataset : ensemble de données BigQuery pour la sortie des flux de modifications

Paramètres facultatifs

  • spannerProjectId : Projet à partir duquel lire les flux de modifications. Cette valeur s'agit également du projet dans lequel la table de métadonnées du connecteur de flux de modifications est créée. La valeur par défaut de ce paramètre correspond au projet dans lequel le pipeline Dataflow est exécuté.
  • spannerDatabaseRole : Rôle de base de données Spanner à utiliser lors de l'exécution du modèle. Ce paramètre n'est requis que lorsque le compte principal IAM qui exécute le modèle est un utilisateur de contrôle d'accès précis. Le rôle de base de données doit disposer du droit SELECT sur le flux de modifications et du droit EXECUTE sur la fonction de lecture du flux de modifications. Pour en savoir plus, consultez la section "Contrôle des accès précis pour les flux de modifications" (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName : Nom de la table des métadonnées du connecteur de flux de modifications Cloud Spanner à utiliser. Si aucune valeur n'est fournie, une table des métadonnées du connecteur de flux de modifications Spanner est automatiquement créée pendant le flux de pipeline. Vous devez fournir ce paramètre lorsque vous mettez à jour un pipeline existant. Sinon, ne fournissez pas ce paramètre.
  • rpcPriority : Priorité des requêtes pour les appels Spanner. La valeur doit être l'une des suivantes : HIGH, MEDIUM ou LOW. La valeur par défaut est HIGH.
  • spannerHost : Point de terminaison Cloud Spanner à appeler dans le modèle. Utilisé uniquement pour les tests. (Exemple : https://batch-spanner.googleapis.com).
  • startTimestamp : date et heure de début (incluses) (https://datatracker.ietf.org/doc/html/rfc3339) à utiliser pour lire les flux de modifications. Ex-2021-10-12T07:20:50.52Z. La valeur par défaut est l'horodatage du démarrage du pipeline, c'est-à-dire l'heure actuelle.
  • endTimestamp : date et heure de fin (incluses) (https://datatracker.ietf.org/doc/html/rfc3339) à utiliser pour la lecture des flux de modifications. Exemple : 2021-10-12T07:20:50.52Z. Elle est définie par défaut sur une période infinie dans le futur.
  • bigQueryProjectId : projet BigQuery. La valeur par défaut est le projet pour le job Dataflow.
  • bigQueryChangelogTableNameTemplate : modèle du nom de la table BigQuery qui contient le journal des modifications. La valeur par défaut est {_metadata_spanner_table_name}_changelog.
  • deadLetterQueueDirectory : chemin d'accès au répertoire de stockage des enregistrements non traités. La valeur par défaut est un répertoire situé sous l'emplacement temporaire du job Dataflow. La valeur par défaut est généralement suffisante.
  • dlqRetryMinutes : nombre de minutes entre les tentatives d'exécution de la file d'attente de lettres mortes. La valeur par défaut est 10.
  • ignoreFields : champs à ignorer (sensibles à la casse) sous forme de liste d'éléments séparés par une virgule. Ces champs peuvent être des champs de tables surveillées ou des champs de métadonnées ajoutés par le pipeline. Les champs ignorés ne sont pas insérés dans BigQuery. Lorsque vous ignorez le champ _metadata_spanner_table_name, le paramètre bigQueryChangelogTableNameTemplate est également ignoré. La valeur par défaut est vide.
  • disableDlqRetries : indique si les nouvelles tentatives pour la DLQ doivent être désactivées. La valeur par défaut est "false".
  • useStorageWriteApi : si cette valeur est définie sur "true", le pipeline utilise l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). La valeur par défaut est false. Pour en savoir plus, consultez la page "Utiliser l'API Storage Write" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce : spécifie la sémantique d'écriture, lorsque vous utilisez l'API Storage Write. Pour utiliser la sémantique de type "au moins une fois" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), définissez ce paramètre sur true. Pour utiliser la sémantique de type "exactement une fois", définissez le paramètre sur false. Ce paramètre ne s'applique que lorsque la valeur de useStorageWriteApi est définie sur true. La valeur par défaut est false.
  • numStorageWriteApiStreams : spécifie le nombre de flux d'écriture, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre. La valeur par défaut est 0.
  • storageWriteApiTriggeringFrequencySec : spécifie la fréquence de déclenchement, en secondes, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Spanner change streams to BigQuery template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • SPANNER_INSTANCE_ID : ID de l'instance Spanner
  • SPANNER_DATABASE: Base de données Spanner
  • SPANNER_METADATA_INSTANCE_ID : ID d'instance de métadonnées Spanner
  • SPANNER_METADATA_DATABASE : base de données de métadonnées Spanner
  • SPANNER_CHANGE_STREAM : flux de modifications Spanner
  • BIGQUERY_DATASET : ensemble de données BigQuery pour la sortie des flux de modifications

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • SPANNER_INSTANCE_ID : ID de l'instance Spanner
  • SPANNER_DATABASE: Base de données Spanner
  • SPANNER_METADATA_INSTANCE_ID : ID d'instance de métadonnées Spanner
  • SPANNER_METADATA_DATABASE : base de données de métadonnées Spanner
  • SPANNER_CHANGE_STREAM : flux de modifications Spanner
  • BIGQUERY_DATASET : ensemble de données BigQuery pour la sortie des flux de modifications

Étapes suivantes