Le modèle de flux de modifications Bigtable vers BigQuery est un pipeline de flux de données qui diffuse les enregistrements de modification de données Bigtable et les écrit dans des tables BigQuery à l'aide de Dataflow.
Un flux de modifications Bigtable vous permet de vous abonner aux mutations de données par table. Lorsque vous vous abonnez à des flux de modification de table, les contraintes suivantes s'appliquent :
- Seuls les cellules et les descripteurs modifiés des opérations de suppression sont renvoyés.
- Seule la nouvelle valeur d'une cellule modifiée est renvoyée.
Lorsque des enregistrements de modification de données sont écrits dans BigQuery, les lignes peuvent être insérées dans le désordre par rapport à l'ordre d'horodatage de commit Bigtable d'origine.
Les lignes de la table du journal des modifications ne pouvant pas être écrites dans BigQuery en raison d'une erreur persistante sont définitivement placées dans une file d'attente de lettres mortes (file d'attente de messages non traités) dans Cloud Storage pour un examen manuel ou un traitement ultérieur réalisé par l'utilisateur.
Si la table BigQuery nécessaire n'existe pas, le pipeline la crée. Sinon, une table BigQuery existante est utilisée. Le schéma des tables BigQuery existantes doit contenir les colonnes dans la table suivante.
Chaque nouvelle ligne BigQuery inclut un enregistrement de modification de données renvoyé par le flux de modifications depuis la ligne correspondante dans la table Bigtable.
Schéma de la table de sortie BigQuery
Nom de la colonne | Type | Nullable | Description |
---|---|---|---|
row_key |
STRING ou BYTES |
Non | Clé de ligne de la ligne modifiée. Lorsque l'option de pipeline writeRowkeyAsBytes est définie sur true , le type de la colonne doit être BYTES . Sinon, utilisez le type STRING . |
mod_type |
STRING |
Non | Type de mutation de ligne. Utilisez l'une des valeurs suivantes : SET_CELL , DELETE_CELLS ou DELETE_FAMILY . |
column_family |
STRING |
Non | Famille de colonnes affectée par la mutation de ligne. |
column |
STRING |
Oui | Qualificatif de colonne affecté par la mutation de ligne. Pour le type de mutation DELETE_FAMILY , définissez cette valeur sur NULL . |
commit_timestamp |
TIMESTAMP |
Non | Date/heure à laquelle Bigtable applique la mutation. |
big_query_commit_timestamp |
TIMESTAMP |
Oui | Facultatif : spécifie la date et l'heure auxquelles BigQuery écrit la ligne dans une table de sortie. Le champ n'est pas renseigné si le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore . |
timestamp |
TIMESTAMP ou INT64 |
Oui | Valeur d'horodatage de la cellule affectée par la mutation. Lorsque l'option de pipeline writeNumericTimestamps est définie sur true , le type de la colonne doit être INT64 . Sinon, utilisez le type TIMESTAMP .
Pour les types de mutation DELETE_CELLS et DELETE_FAMILY , définissez la valeur sur NULL . |
timestamp_from |
TIMESTAMP ou INT64 |
Oui | Décrit un début inclusif de l'intervalle d'horodatage pour toutes les cellules supprimées par la mutation DELETE_CELLS . Pour les autres types de mutation, définissez cette valeur sur NULL . |
timestamp_to |
TIMESTAMP ou INT64 |
Oui | Décrit une fin exclusive de l'intervalle d'horodatage pour toutes les cellules supprimées par la mutation DELETE_CELLS . Pour les autres types de mutation, définissez cette valeur sur NULL . |
is_gc |
BOOL |
Non | Facultatif : lorsque la mutation est déclenchée par une stratégie de récupération de mémoire, définissez cette valeur sur true .
Dans tous les autres cas, définissez cette valeur sur false . Le champ n'est pas renseigné lorsque le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore . |
source_instance |
STRING |
Non | Facultatif : décrit le nom de l'instance Bigtable d'où provient la mutation. Le champ n'est pas renseigné lorsque le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore . |
source_cluster |
STRING |
Non | Facultatif : décrit le nom du cluster Bigtable d'où provient la mutation. Le champ n'est pas renseigné lorsque le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore . |
source_table |
STRING |
Non | Facultatif : décrit le nom de la table Bigtable à laquelle la mutation s'applique. La valeur dans cette colonne peut être utile si plusieurs tables Bigtable diffusent des modifications vers la même table BigQuery. Le champ n'est pas renseigné lorsque le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore . |
tiebreaker |
INT64 |
Non | Facultatif : lorsque deux mutations sont enregistrées en même temps par différents clusters Bigtable, la mutation avec la valeur tiebreaker la plus élevée est appliquée à la table source. Les mutations ayant des valeurs tiebreaker inférieures sont supprimées. Le champ n'est pas renseigné lorsque le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore . |
value |
STRING ou BYTES |
Oui | Nouvelle valeur définie par la mutation. Lorsque l'option de pipeline writeValuesAsBytes est définie sur true , le type de la colonne doit être BYTES . Sinon, utilisez le type STRING . La valeur est définie pour les mutations SET_CELL . Pour les autres types de mutation, la valeur est définie sur NULL . |
Conditions requises pour ce pipeline
- Instance source Bigtable spécifiée.
- Table source Bigtable spécifiée. Les flux de modifications doivent être activés dans la table.
- Profil d'application Bigtable spécifié.
- Ensemble de données de destination BigQuery spécifié.
Paramètres de modèle
Paramètres obligatoires
- bigQueryDataset : nom de l'ensemble de données de la table BigQuery de destination.
- bigtableChangeStreamAppProfile : ID du profil d'application Bigtable. Le profil d'application doit utiliser un routage à cluster unique et autoriser les transactions à ligne unique.
- bigtableReadInstanceId : ID de l'instance Bigtable source.
- bigtableReadTableId : ID de la table Bigtable source.
Paramètres facultatifs
- writeRowkeyAsBytes : indique s'il faut écrire les clés de ligne en tant que
BYTES
BigQuery. Lorsque ce paramètre est défini surtrue
, les clés de ligne sont écrites dans la colonneBYTES
. Sinon, les clés de ligne sont écrites dans la colonneSTRING
. La valeur par défaut estfalse
. - writeValuesAsBytes : lorsque les valeurs "true" sont définies, elles sont écrites dans une colonne BYTES. Sinon, elles sont écrites dans une colonne STRING. Valeur par défaut : "false".
- writeNumericTimestamps : indique s'il faut écrire le code temporel Bigtable en tant que
INT64
BigQuery. Lorsque ce paramètre est défini sur "true", les valeurs sont écrites dans la colonneINT64
. Sinon, les valeurs sont écrites dans la colonneTIMESTAMP
. Colonnes concernées :timestamp
,timestamp_from
ettimestamp_to
. La valeur par défaut estfalse
. Lorsque ce paramètre est défini surtrue
, le temps est mesuré en microsecondes depuis l'époque Unix (1er janvier 1970 à l'heure UTC). - bigQueryProjectId : ID de projet de l'ensemble de données BigQuery. La valeur par défaut est le projet pour le job Dataflow.
- bigQueryChangelogTableName : nom de la table BigQuery de destination. Si cette option n'est pas spécifiée, la valeur
bigtableReadTableId + "_changelog"
est utilisée. La valeur par défaut est vide. - bigQueryChangelogTablePartitionGranularity : spécifie la précision pour le partitionnement de la table des journaux de modifications. Lorsque ce paramètre est défini, la table est partitionnée. Utilisez l'une des valeurs acceptées suivantes :
HOUR
,DAY
,MONTH
ouYEAR
. Par défaut, la table n'est pas partitionnée. - bigQueryChangelogTablePartitionExpirationMs : définit le délai d'expiration de la partition de table des journaux de modifications, en millisecondes. Lorsque ce paramètre est défini sur "true", les partitions antérieures au nombre de millisecondes spécifié sont supprimées. Par défaut, aucun délai d'expiration n'est défini.
- bigQueryChangelogTableFieldsToIgnore : liste des colonnes du journal des modifications, séparées par une virgule, qui ne sont pas créées ni renseignées si ce paramètre est spécifié. Utilisez l'une des valeurs acceptées suivantes :
is_gc
,source_instance
,source_cluster
,source_table
,tiebreaker
oubig_query_commit_timestamp
. Par défaut, toutes les colonnes sont renseignées. - dlqDirectory : répertoire à utiliser pour la file d'attente de lettres mortes. Les enregistrements dont le traitement échoue sont stockés dans ce répertoire. La valeur par défaut est un répertoire situé sous l'emplacement temporaire du job Dataflow. Dans la plupart des cas, vous pouvez utiliser le chemin par défaut.
- bigtableChangeStreamMetadataInstanceId : ID d'instance de métadonnées du flux de modifications Bigtable. La valeur par défaut est vide.
- bigtableChangeStreamMetadataTableTableId : ID de la table de métadonnées du connecteur de flux de modifications Bigtable. Si aucune valeur n'est fournie, une table de métadonnées du connecteur de flux de modifications Bigtable est automatiquement créée pendant l'exécution du pipeline. La valeur par défaut est vide.
- bigtableChangeStreamCharset : nom du charset de flux de modifications Bigtable. La valeur par défaut est UTF-8.
- bigtableChangeStreamStartTimestamp : code temporel de début (https://tools.ietf.org/html/rfc3339), inclusif, à utiliser pour la lecture des flux de modifications. Par exemple,
2022-05-05T07:59:59Z
. La valeur par défaut est l'horodatage de l'heure de début du pipeline. - bigtableChangeStreamIgnoreColumnFamilies : liste des modifications de noms de familles de colonnes, séparées par une virgule, à ignorer. La valeur par défaut est vide.
- bigtableChangeStreamIgnoreColumns : liste des modifications de noms de colonnes, séparées par une virgule, à ignorer. La valeur par défaut est vide.
- bigtableChangeStreamName : nom unique pour le pipeline client. Permet de reprendre le traitement à partir du moment où un pipeline précédemment exécuté s'est arrêté. La valeur par défaut est un nom généré automatiquement. Consultez les journaux du job Dataflow pour connaître la valeur utilisée.
- bigtableChangeStreamResume : lorsque ce paramètre est défini sur
true
, un nouveau pipeline reprend le traitement à partir du moment où un pipeline précédemment exécuté avec la même valeurbigtableChangeStreamName
s'est arrêté. Si le pipeline avec la valeurbigtableChangeStreamName
donnée n'a jamais été exécuté, aucun nouveau pipeline ne démarre. Lorsque ce paramètre est défini surfalse
, un nouveau pipeline démarre. Si un pipeline avec la même valeurbigtableChangeStreamName
a déjà été exécuté pour la source donnée, aucun nouveau pipeline ne démarre. La valeur par défaut estfalse
. - bigtableReadProjectId : ID du projet Bigtable. La valeur par défaut est le projet pour le job Dataflow.
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Bigtable change streams to BigQuery template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ bigQueryDataset=BIGQUERY_DESTINATION_DATASET
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
REGION_NAME
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
BIGTABLE_INSTANCE_ID
: ID de votre instance Bigtable.BIGTABLE_TABLE_ID
: ID de votre table Bigtable.BIGTABLE_APPLICATION_PROFILE_ID
: ID de votre profil d'application Bigtable.BIGQUERY_DESTINATION_DATASET
: nom de l'ensemble de données de destination BigQuery.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET" } } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
LOCATION
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
BIGTABLE_INSTANCE_ID
: ID de votre instance Bigtable.BIGTABLE_TABLE_ID
: ID de votre table Bigtable.BIGTABLE_APPLICATION_PROFILE_ID
: ID de votre profil d'application Bigtable.BIGQUERY_DESTINATION_DATASET
: nom de l'ensemble de données de destination BigQuery.
Étapes suivantes
- Apprenez-en plus sur les modèles Dataflow.
- Consultez la liste des modèles fournis par Google.