Modèle de flux de modifications Spanner vers BigQuery

Le modèle de flux de modification Spanner vers BigQuery est un pipeline de streaming qui diffuse les enregistrements de modification des données Spanner et les écrit dans les tables BigQuery à l'aide de l'exécuteur Dataflow V2.

Toutes les colonnes du flux de modifications surveillées sont incluses dans chaque ligne de la table BigQuery, qu'elles soient modifiées ou non par une transaction Spanner. Les colonnes non surveillées ne sont pas incluses dans la ligne BigQuery. Toute modification de Spanner inférieure au filigrane Dataflow est appliquée aux tables BigQuery ou est stockée dans la file d'attente de lettres mortes pour nouvelle tentative. Les lignes BigQuery sont insérées dans le désordre par rapport à l'ordre d'horodatage de commit Spanner d'origine.

Si les tables BigQuery nécessaires n'existent pas, le pipeline les crée. Sinon, vous utilisez des tables BigQuery existantes. Le schéma des tables BigQuery existantes doit contenir les colonnes suivies correspondantes des tables Spanner et toutes les colonnes de métadonnées supplémentaires qui ne sont pas ignorées explicitement par l'option ignoreFields. La description des champs de métadonnées se trouve dans la liste suivante. Chaque nouvelle ligne BigQuery inclut toutes les colonnes surveillées par le flux de modification de la ligne correspondante dans la table Spanner à l'horodatage de l'enregistrement de modification.

Les champs de métadonnées suivants sont ajoutés aux tables BigQuery. Pour en savoir plus sur ces champs, consultez la section Enregistrements de modifications des données dans la page "Modifier les partitions, les enregistrements et les requêtes de flux".

Lorsque vous utilisez ce modèle, tenez compte des détails suivants:

  • Ce modèle ne propage pas les modifications de schéma de Spanner vers BigQuery. Étant donné qu'une modification du schéma dans Spanner est susceptible de perturber le pipeline, vous devrez peut-être le recréer après la modification du schéma.
  • Pour les types de capture de valeur OLD_AND_NEW_VALUES et NEW_VALUES, lorsque l'enregistrement de modification de données contient une modification UPDATE, le modèle doit effectuer une lecture non actualisée dans Spanner à l'horodatage de validation de l'enregistrement de modification de données, afin de récupérer les colonnes non modifiées mais surveillées. Veillez à configurer correctement la "version_retention_period" de votre base de données pour la lecture non actualisée. Pour le type de capture de valeur NEW_ROW, le modèle est plus efficace, car l'enregistrement de modification de données capture la nouvelle ligne complète, y compris les colonnes qui ne sont pas mises à jour dans les requêtes UPDATE, et le modèle n'a pas besoin d'effectuer une lecture non actualisée.
  • Pour minimiser la latence et les coûts de transport du réseau, exécutez la tâche Dataflow à partir de la même région que votre instance Spanner ou vos tables BigQuery. Si vous utilisez des sources, des récepteurs, des emplacements de fichiers de préproduction ou des emplacements de fichiers temporaires situés en dehors de la région associée à votre tâche, vos données peuvent être envoyées d'une région à l'autre. Pour en savoir plus, consultez la page Régions Dataflow.
  • Ce modèle accepte tous les types de données Spanner valides, mais si le type BigQuery est plus précis que le type Spanner, la transformation peut occasionner une perte de précision. Plus précisément :
    • Pour le type JSON de Spanner, les membres d'un objet sont ordonnés de façon lexicographique, mais il n'existe aucune garantie similaire pour le type JSON de BigQuery.
    • Spanner accepte le type TIMESTAMP en nanosecondes, mais BigQuery n'accepte le type TIMESTAMP qu'en microsecondes.
  • Ce modèle ne permet pas d'utiliser l' API BigQuery Storage Write en mode "exactement une fois".

En savoir plus sur les flux de modification, la création de pipelines Dataflow de flux de modification et les bonnes pratiques.

Conditions requises pour ce pipeline

  • L'instance Spanner doit exister avant l'exécution du pipeline.
  • La base de données Spanner doit exister avant l'exécution du pipeline.
  • L'instance de métadonnées Spanner doit exister avant l'exécution du pipeline.
  • La base de données de métadonnées Spanner doit exister avant l'exécution du pipeline.
  • Le flux de modifications Spanner doit exister avant l'exécution du pipeline.
  • L'ensemble de données BigQuery doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètres obligatoires

  • spannerInstanceId : Instance Spanner à partir de laquelle lire les flux de modifications.
  • spannerDatabase : Base de données Spanner à partir de laquelle lire les flux de modifications.
  • spannerMetadataInstanceId : Instance Spanner à utiliser pour la table de métadonnées du connecteur de flux de modification.
  • spannerMetadataDatabase : Base de données Spanner à utiliser pour la table de métadonnées du connecteur de flux de modification. Pour les flux de modifications qui suivent toutes les tables d'une base de données, nous vous recommandons de placer la table des métadonnées dans une base de données distincte.
  • spannerChangeStreamName : Nom du flux de modifications Spanner à lire.
  • bigQueryDataset : ensemble de données BigQuery pour la sortie des flux de modifications Le dataSetName et le full dataSetId (c'est-à-dire bigQueryProjectId.dataSetName) sont acceptables.

Paramètres facultatifs

  • spannerProjectId : Projet à partir duquel lire les flux de modifications. La valeur par défaut de ce paramètre correspond au projet dans lequel le pipeline Dataflow est exécuté.
  • spannerDatabaseRole : l'utilisateur du rôle de base de données assume le rôle lors de la lecture du flux de modifications. Le rôle de base de données doit disposer des droits requis pour lire les flux de modifications. Si aucun rôle de base de données n'est spécifié, l'utilisateur doit disposer des autorisations IAM requises pour lire les données de la base de données.
  • spannerMetadataTableName : Nom de la table des métadonnées du connecteur de flux de modifications Cloud Spanner à utiliser. Si aucune valeur n'est fournie, une table de métadonnées du connecteur de flux de modification Cloud Spanner est automatiquement créée pendant le flux de pipeline. Ce paramètre doit être fourni lors de la mise à jour d'un pipeline existant et ne doit pas être renseigné si ce n'est pas le cas.
  • rpcPriority : Priorité des requêtes pour les appels Cloud Spanner. La valeur doit être l'une des suivantes : [HIGH,MEDIUM,LOW]. La valeur par défaut est HIGH.
  • spannerHost : Point de terminaison Cloud Spanner à appeler dans le modèle. Utilisé uniquement pour les tests. (Exemple : https://batch-spanner.googleapis.com).
  • startTimestamp : date et heure de début (incluses) à utiliser pour lire les flux de modifications (https://tools.ietf.org/html/rfc3339). Par exemple, 2022-05-05T07:59:59Z. La valeur par défaut est l'horodatage du démarrage du pipeline.
  • endTimestamp : la date et l'heure de fin (incluses), à utiliser pour la lecture des flux de modifications (https://tools.ietf.org/html/rfc3339). Ex-2022-05-05T07:59:59Z. Elle est définie par défaut sur une période infinie dans le futur.
  • bigQueryProjectId : projet BigQuery. Le projet par défaut est la tâche Dataflow.
  • bigQueryChangelogTableNameTemplate : modèle du nom de la table BigQuery qui contient le journal des modifications. La valeur par défaut est {_metadata_spanner_table_name}_changelog.
  • deadLetterQueueDirectory : Chemin d'accès au fichier de stockage des enregistrements non traités et le motif pour lesquels ils n'ont pas pu être traités. La valeur par défaut est un répertoire situé sous l'emplacement temporaire de la tâche Dataflow. La valeur par défaut est suffisante dans la plupart des conditions.
  • dlqRetryMinutes : Nombre de minutes entre les tentatives d'exécution de la file d'attente de lettres mortes. Valeur par défaut : 10
  • ignoreFields : Liste de champs, séparés par une virgule, à ignorer. Il peut s'agir de champs appartenant à des tables suivies ou des champs de métadonnées suivants : _metadata_spanner_mod_type, _metadata_spanner_table_name, _metadata_spanner_commit_timestamp, _metadata_spanner_server_transaction_id, _metadata_spanner_record_sequence, _metadata_spanner_is_last_record_in_transaction_in_partition, _metadata_spanner_number_of_records_in_transaction, _metadata_spanner_number_of_partitions_in_transaction, _metadata_big_query_commit_timestamp. La valeur par défaut est vide.
  • disableDlqRetries : indique si les nouvelles tentatives pour la DLQ doivent être désactivées. Valeur par défaut : "false".
  • useStorageWriteApi : si la valeur est définie sur "true", le pipeline utilise l'API Storage Write lors de l'écriture des données dans BigQuery (voir https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). La valeur par défaut est false. Lorsque vous utilisez l'API Storage Write en mode "exactement une fois", vous devez définir les paramètres suivants : "Nombre de flux pour l'API BigQuery Storage Write" et "Fréquence de déclenchement en secondes pour l'API BigQuery Storage Write". Si vous activez le mode "au moins une fois" de Dataflow ou si vous définissez le paramètre useStorageWriteApiAtLeastOnce sur "true", vous n'avez pas besoin de définir le nombre de flux ni la fréquence de déclenchement.
  • useStorageWriteApiAtLeastOnce : ce paramètre ne prend effet que si l'option "Utiliser l'API BigQuery Storage Write" est activée. Si cette option est activée, la sémantique de type "au moins une fois" est utilisée pour l'API Storage Write. Sinon, la sémantique de type "exactement une fois" est utilisée. Valeur par défaut : "false".
  • numStorageWriteApiStreams : le nombre de flux définit le parallélisme de la transformation d'écriture de BigQueryIO et correspond à peu près au nombre de flux de l'API Storage Write qui seront utilisés par le pipeline. Consultez la page https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api pour connaître les valeurs recommandées. La valeur par défaut est 0.
  • storageWriteApiTriggeringFrequencySec : la fréquence de déclenchement détermine la date à laquelle les données seront visibles pour les requêtes dans BigQuery. Consultez la page https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api pour connaître les valeurs recommandées.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Spanner change streams to BigQuery template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • SPANNER_INSTANCE_ID : ID de l'instance Spanner
  • SPANNER_DATABASE: Base de données Spanner
  • SPANNER_METADATA_INSTANCE_ID : ID d'instance de métadonnées Spanner
  • SPANNER_METADATA_DATABASE : base de données de métadonnées Spanner
  • SPANNER_CHANGE_STREAM : flux de modifications Spanner
  • BIGQUERY_DATASET : ensemble de données BigQuery pour la sortie des flux de modifications

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • SPANNER_INSTANCE_ID : ID de l'instance Spanner
  • SPANNER_DATABASE: Base de données Spanner
  • SPANNER_METADATA_INSTANCE_ID : ID d'instance de métadonnées Spanner
  • SPANNER_METADATA_DATABASE : base de données de métadonnées Spanner
  • SPANNER_CHANGE_STREAM : flux de modifications Spanner
  • BIGQUERY_DATASET : ensemble de données BigQuery pour la sortie des flux de modifications

Étapes suivantes