Modèle de flux de modifications Bigtable vers BigQuery

Le modèle de flux de modifications Bigtable vers BigQuery est un pipeline de flux de données qui diffuse les enregistrements de modification de données Bigtable et les écrit dans des tables BigQuery à l'aide de Dataflow.

Un flux de modifications Bigtable vous permet de vous abonner aux mutations de données par table. Lorsque vous vous abonnez à des flux de modification de table, les contraintes suivantes s'appliquent :

  • Seuls les cellules et les descripteurs modifiés des opérations de suppression sont renvoyés.
  • Seule la nouvelle valeur d'une cellule modifiée est renvoyée.

Lorsque des enregistrements de modification de données sont écrits dans BigQuery, les lignes peuvent être insérées dans le désordre par rapport à l'ordre d'horodatage de commit Bigtable d'origine.

Les lignes de la table du journal des modifications ne pouvant pas être écrites dans BigQuery en raison d'une erreur persistante sont définitivement placées dans une file d'attente de lettres mortes (file d'attente de messages non traités) dans Cloud Storage pour un examen manuel ou un traitement ultérieur réalisé par l'utilisateur.

Si la table BigQuery nécessaire n'existe pas, le pipeline la crée. Sinon, une table BigQuery existante est utilisée. Le schéma des tables BigQuery existantes doit contenir les colonnes dans la table suivante.

Chaque nouvelle ligne BigQuery inclut un enregistrement de modification de données renvoyé par le flux de modifications depuis la ligne correspondante dans la table Bigtable.

Schéma de la table de sortie BigQuery

Nom de la colonne Type Nullable Description
row_key STRING ou BYTES Non Clé de ligne de la ligne modifiée. Lorsque l'option de pipeline writeRowkeyAsBytes est définie sur true, le type de la colonne doit être BYTES. Sinon, utilisez le type STRING.
mod_type STRING Non Type de mutation de ligne. Utilisez l'une des valeurs suivantes : SET_CELL, DELETE_CELLS ou DELETE_FAMILY.
column_family STRING Non Famille de colonnes affectée par la mutation de ligne.
column STRING Oui Qualificatif de colonne affecté par la mutation de ligne. Pour le type de mutation DELETE_FAMILY, définissez cette valeur sur NULL.
commit_timestamp TIMESTAMP Non Date/heure à laquelle Bigtable applique la mutation.
big_query_commit_timestamp TIMESTAMP Oui Facultatif : spécifie la date et l'heure auxquelles BigQuery écrit la ligne dans une table de sortie. Le champ n'est pas renseigné si le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore.
timestamp TIMESTAMP ou INT64 Oui Valeur d'horodatage de la cellule affectée par la mutation. Lorsque l'option de pipeline writeNumericTimestamps est définie sur true, le type de la colonne doit être INT64. Sinon, utilisez le type TIMESTAMP. Pour les types de mutation DELETE_CELLS et DELETE_FAMILY, définissez la valeur sur NULL.
timestamp_from TIMESTAMP ou INT64 Oui Décrit un début inclusif de l'intervalle d'horodatage pour toutes les cellules supprimées par la mutation DELETE_CELLS. Pour les autres types de mutation, définissez cette valeur sur NULL.
timestamp_to TIMESTAMP ou INT64 Oui Décrit une fin exclusive de l'intervalle d'horodatage pour toutes les cellules supprimées par la mutation DELETE_CELLS. Pour les autres types de mutation, définissez cette valeur sur NULL.
is_gc BOOL Non Facultatif : lorsque la mutation est déclenchée par une stratégie de récupération de mémoire, définissez cette valeur sur true. Dans tous les autres cas, définissez cette valeur sur false. Le champ n'est pas renseigné lorsque le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore.
source_instance STRING Non Facultatif : décrit le nom de l'instance Bigtable d'où provient la mutation. Le champ n'est pas renseigné lorsque le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore.
source_cluster STRING Non Facultatif : décrit le nom du cluster Bigtable d'où provient la mutation. Le champ n'est pas renseigné lorsque le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore.
source_table STRING Non Facultatif : décrit le nom de la table Bigtable à laquelle la mutation s'applique. La valeur dans cette colonne peut être utile si plusieurs tables Bigtable diffusent des modifications vers la même table BigQuery. Le champ n'est pas renseigné lorsque le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore.
tiebreaker INT64 Non Facultatif : lorsque deux mutations sont enregistrées en même temps par différents clusters Bigtable, la mutation avec la valeur tiebreaker la plus élevée est appliquée à la table source. Les mutations ayant des valeurs tiebreaker inférieures sont supprimées. Le champ n'est pas renseigné lorsque le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore.
value STRING ou BYTES Oui Nouvelle valeur définie par la mutation. Lorsque l'option de pipeline writeValuesAsBytes est définie sur true, le type de la colonne doit être BYTES. Sinon, utilisez le type STRING. La valeur est définie pour les mutations SET_CELL. Pour les autres types de mutation, la valeur est définie sur NULL.

Conditions requises pour ce pipeline

  • Instance source Bigtable spécifiée.
  • Table source Bigtable spécifiée. Les flux de modifications doivent être activés dans la table.
  • Profil d'application Bigtable spécifié.
  • Ensemble de données de destination BigQuery spécifié.

Paramètres de modèle

Paramètres Description
bigtableReadInstanceId ID de l'instance Bigtable source.
bigtableReadTableId ID de la table Bigtable source.
bigtableChangeStreamAppProfile ID de profil d'application Bigtable. Le profil d'application doit utiliser un routage à cluster unique et autoriser les transactions à ligne unique.
bigQueryDataset Nom de l'ensemble de données de la table BigQuery de destination.
writeNumericTimestamps Facultatif : écrit l'horodatage Bigtable en tant que INT64 BigQuery. Lorsque ce paramètre est défini sur true, les valeurs sont écrites dans la colonne INT64. Sinon, les valeurs sont écrites dans la colonne TIMESTAMP. Colonnes concernées : timestamp, timestamp_from et timestamp_to. La valeur par défaut est false. Lorsque ce paramètre est défini sur true, le temps est mesuré en microsecondes depuis l'époque Unix (1er janvier 1970 à l'heure UTC).
writeRowkeyAsBytes Facultatif : écrit les clés de ligne en tant que BYTES BigQuery. Lorsque ce paramètre est défini sur true, les clés de ligne sont écrites dans la colonne BYTES. Sinon, les clés de ligne sont écrites dans la colonne STRING. La valeur par défaut est false.
writeValuesAsBytes Facultatif : écrit les valeurs en tant que BYTES BigQuery. Lorsque ce paramètre est défini sur true, les valeurs sont écrites dans la colonne BYTES. Sinon, les valeurs sont écrites dans la colonne STRING. La valeur par défaut est false.
bigQueryChangelogTableName Facultatif : nom de la table BigQuery de destination. S'il n'est pas spécifié, la valeur bigtableReadTableId + "_changelog" est utilisée.
bigQueryProjectId Facultatif : ID de projet de l'ensemble de données BigQuery. La valeur par défaut est le projet pour le job Dataflow.
bigtableReadProjectId Facultatif : ID du projet Bigtable. La valeur par défaut est le projet pour le job Dataflow.
bigtableChangeStreamMetadataInstanceId Facultatif : ID d'instance de métadonnées du flux de modifications Bigtable.
bigtableChangeStreamMetadataTableTableId Facultatif : ID de table de métadonnées du flux de modifications Bigtable.
bigtableChangeStreamCharset Facultatif : nom du charset de flux de modifications Bigtable lors de la lecture des valeurs et des qualificatifs de colonnes.
bigtableChangeStreamStartTimestamp Facultatif : horodatage de démarrage, inclusif, à utiliser pour la lecture des flux de modifications. Par exemple, 2022-05-05T07:59:59Z. La valeur par défaut est l'horodatage de l'heure de début du pipeline.
bigtableChangeStreamIgnoreColumnFamilies Facultatif : liste des modifications de noms de familles de colonnes, séparées par une virgule, à ignorer.
bigtableChangeStreamIgnoreColumns Facultatif : liste des modifications de noms de colonnes, séparées par une virgule, à ignorer.
bigtableChangeStreamName Facultatif : nom unique pour le pipeline client. Permet de reprendre le traitement à partir du moment où un pipeline précédemment exécuté s'est arrêté. La valeur par défaut est un nom généré automatiquement. Consultez les journaux du job Dataflow pour connaître la valeur utilisée.
bigtableChangeStreamResume Facultatif : lorsque ce paramètre est défini sur true, un nouveau pipeline reprend le traitement à partir du moment où un pipeline précédemment exécuté avec la même valeur bigtableChangeStreamName s'est arrêté. Si le pipeline avec la valeur bigtableChangeStreamName donnée n'a jamais été exécuté, aucun nouveau pipeline ne démarre. Lorsque ce paramètre est défini sur false, un nouveau pipeline démarre. Si un pipeline avec la même valeur bigtableChangeStreamName a déjà été exécuté pour la source donnée, aucun nouveau pipeline ne démarre. La valeur par défaut est false.
bigQueryChangelogTableFieldsToIgnore Facultatif : liste des colonnes du journal des modifications, séparées par une virgule, qui ne sont pas créées ni renseignées si ce paramètre est spécifié. Utilisez l'une des valeurs compatibles suivantes : is_gc, source_instance, source_cluster, source_table, tiebreaker ou big_query_commit_timestamp. Par défaut, toutes les colonnes sont renseignées.
bigQueryChangelogTablePartitionExpirationMs Facultatif : définit le délai d'expiration de la partition de table des journaux de modifications, en millisecondes. Lorsque ce paramètre est défini sur true, les partitions antérieures au nombre de millisecondes spécifié sont supprimées. Par défaut, aucun délai d'expiration n'est défini.
bigQueryChangelogTablePartitionGranularity Facultatif : spécifie la précision pour le partitionnement de la table des journaux de modifications. Lorsque ce paramètre est défini, la table est partitionnée. Utilisez l'une des valeurs acceptées suivantes : HOUR, DAY, MONTH ou YEAR. Par défaut, la table n'est pas partitionnée.
dlqDirectory Facultatif : répertoire pour la file d'attente de lettres mortes. Les enregistrements dont le traitement échoue sont stockés dans ce répertoire. La valeur par défaut est un répertoire situé sous l'emplacement temporaire du job Dataflow. Dans la plupart des cas, vous pouvez utiliser le chemin par défaut.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Bigtable change streams to BigQuery template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
bigQueryDataset=BIGQUERY_DESTINATION_DATASET

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom unique de la tâche de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • BIGTABLE_INSTANCE_ID : ID de votre instance Bigtable.
  • BIGTABLE_TABLE_ID : ID de votre table Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID : ID de votre profil d'application Bigtable.
  • BIGQUERY_DESTINATION_DATASET : nom de l'ensemble de données de destination BigQuery.

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET"
    }
  }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom unique de la tâche de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • BIGTABLE_INSTANCE_ID : ID de votre instance Bigtable.
  • BIGTABLE_TABLE_ID : ID de votre table Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID : ID de votre profil d'application Bigtable.
  • BIGQUERY_DESTINATION_DATASET : nom de l'ensemble de données de destination BigQuery.

Étapes suivantes