Modèle de flux de modifications Bigtable vers Pub/Sub

Le modèle de flux de modifications Bigtable vers Pub/Sub est un pipeline de traitement en flux continu qui diffuse les enregistrements de modifications des données Bigtable et les publie vers un sujet Pub/Sub à l'aide de Dataflow.

Un flux de modifications Bigtable vous permet de vous abonner aux mutations de données par table. Lorsque vous vous abonnez à des flux de modification de table, les contraintes suivantes s'appliquent :

  • Seuls les cellules et les descripteurs modifiés des opérations de suppression sont renvoyés.
  • Seule la nouvelle valeur d'une cellule modifiée est renvoyée.

Lorsque des enregistrements de modification de données sont publiés dans un sujet Pub/Sub, les messages peuvent être insérés dans le désordre par rapport à l'ordre d'horodatage de commit Bigtable d'origine.

Les enregistrements de modification de données Bigtable qui ne peuvent pas être publiés dans des sujets Pub/Sub sont temporairement placés dans un répertoire de file d'attente de lettres mortes (file d'attente de messages non traités) dans Storage. Une fois que le nombre maximal de tentatives infructueuses a été atteint, ces enregistrements sont indéfiniment placés dans le même répertoire de files d'attente de lettres mortes pour un examen manuel ou un traitement ultérieur réalisé par l'utilisateur.

Le pipeline nécessite que le sujet Pub/Sub de destination existe. Le sujet de destination peut être configuré pour valider les messages à l'aide d'un schéma. Lorsqu'un sujet Pub/Sub spécifie un schéma, le pipeline ne démarre que si ce schéma est valide. Selon le type de schéma, utilisez l'une des définitions de schéma suivantes pour le sujet de destination :

Tampons de protocole

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangeLogEntryProto";

message ChangelogEntryProto{
  required bytes rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional bytes column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional bytes value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
  

Avro

{
    "name" : "ChangelogEntryMessage",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "rowKey", "type" : "bytes"},
      {
        "name" : "modType",
        "type" : {
          "name": "ModType",
          "type": "enum",
          "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
      },
      { "name": "isGC", "type": "boolean" },
      { "name": "tieBreaker", "type": "int"},
      { "name": "columnFamily", "type": "string"},
      { "name": "commitTimestamp", "type" : "long"},
      { "name" : "sourceInstance", "type" : "string"},
      { "name" : "sourceCluster", "type" : "string"},
      { "name" : "sourceTable", "type" : "string"},
      { "name": "column", "type" : ["null", "bytes"]},
      { "name": "timestamp", "type" : ["null", "long"]},
      { "name": "timestampFrom", "type" : ["null", "long"]},
      { "name": "timestampTo", "type" : ["null", "long"]},
      { "name" : "value", "type" : ["null", "bytes"]}
   ]
}
    

JSON

Utilisez le schéma Protobuf suivant avec l'encodage de messages JSON :

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangelogEntryMessageText";

message ChangelogEntryText{
  required string rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional string column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional string value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
    

Chaque nouveau message Pub/Sub inclut une entrée d'un enregistrement de modification de données renvoyé par le flux de modification à partir de la ligne correspondante dans votre table Bigtable. Le modèle Pub/Sub aplatit les entrées de chaque enregistrement de modification de données en modifications individuelles au niveau de la cellule.

Description du message de sortie Pub/Sub

Nom du champ Description
rowKey Clé de ligne de la ligne modifiée. Arrive sous la forme d'un tableau d'octets. Lorsque l'encodage de messages JSON est configuré, les clés de ligne sont renvoyées sous forme de chaînes. Lorsque useBase64Rowkeys est spécifié, les clés de ligne sont encodées en base64. Sinon, un charset spécifié par bigtableChangeStreamCharset est utilisé pour décoder les octets de clés de ligne en une chaîne.
modType Type de mutation de ligne. Utilisez l'une des valeurs suivantes : SET_CELL, DELETE_CELLS ou DELETE_FAMILY.
columnFamily Famille de colonnes affectée par la mutation de ligne.
column Qualificatif de colonne affecté par la mutation de ligne. Pour le type de mutation DELETE_FAMILY, le champ de colonne n'est pas défini. Arrive sous la forme d'un tableau d'octets. Lorsque l'encodage de messages JSON est configuré, les colonnes sont renvoyées sous forme de chaînes. Lorsque useBase64ColumnQualifier est spécifié, le champ de la colonne est encodé en base64. Sinon, un charset spécifié par bigtableChangeStreamCharset est utilisé pour décoder les octets de clés de ligne en une chaîne.
commitTimestamp Date/heure à laquelle Bigtable applique la mutation. Ce temps est mesuré en microsecondes depuis l'epoch Unix (1er janvier 1970 à l'heure UTC).
timestamp Valeur d'horodatage de la cellule affectée par la mutation. Pour les types de mutation DELETE_CELLS et DELETE_FAMILY, l'horodatage n'est pas défini. Ce temps est mesuré en microsecondes depuis l'epoch Unix (1er janvier 1970 à l'heure UTC).
timestampFrom Décrit un début inclusif de l'intervalle d'horodatage pour toutes les cellules supprimées par la mutation DELETE_CELLS. Pour les autres types de mutation, timestampFrom n'est pas défini. Ce temps est mesuré en microsecondes depuis l'epoch Unix (1er janvier 1970 à l'heure UTC).
timestampTo Décrit une fin exclusive de l'intervalle d'horodatage pour toutes les cellules supprimées par la mutation DELETE_CELLS. Pour les autres types de mutation, timestampTo n'est pas défini.
isGC Valeur booléenne indiquant si la mutation est générée par un mécanisme de récupération de mémoire Bigtable.
tieBreaker Facultatif : lorsque deux mutations sont enregistrées en même temps par différents clusters Bigtable, la mutation avec la valeur tiebreaker la plus élevée est appliquée à la table source. Les mutations ayant des valeurs tiebreaker inférieures sont supprimées.
value Nouvelle valeur définie par la mutation. Sauf si l'option de pipeline stripValues est définie, la valeur est définie pour les mutations SET_CELL. Pour les autres types de mutation, la valeur n'est pas définie. Arrive sous la forme d'un tableau d'octets. Lorsque l'encodage des messages JSON est configuré, les valeurs sont renvoyées sous forme de chaînes. Lorsque useBase64Values est spécifié, la valeur est encodée en base64. Sinon, un charset spécifié par bigtableChangeStreamCharset est utilisé pour décoder les octets de valeur en une chaîne.
sourceInstance Nom de l'instance Bigtable qui a enregistré la mutation. Peut être utilisé lorsque plusieurs pipelines diffusent des modifications depuis différentes instances vers le même sujet Pub/Sub.
sourceCluster Nom du cluster Bigtable qui a enregistré la mutation. Peut être utilisé lorsque plusieurs pipelines diffusent des modifications depuis différentes instances vers le même sujet Pub/Sub.
sourceTable Nom de la table Bigtable ayant reçu la mutation. Peut être utilisé lorsque plusieurs pipelines diffusent des modifications depuis différentes tables vers le même sujet Pub/Sub.

Conditions requises pour ce pipeline

  • Instance source Bigtable spécifiée.
  • Table source Bigtable spécifiée. Les flux de modifications doivent être activés dans la table.
  • Profil d'application Bigtable spécifié.
  • Le sujet Pub/Sub spécifié doit exister.

Paramètres de modèle

Paramètres obligatoires

  • pubSubTopic: nom du sujet Pub/Sub de destination.
  • bigtableChangeStreamAppProfile: ID du profil d'application Bigtable. Le profil d'application doit utiliser un routage à cluster unique et autoriser les transactions à ligne unique.
  • bigtableReadInstanceId: ID de l'instance Bigtable source.
  • bigtableReadTableId: ID de la table Bigtable source.

Paramètres facultatifs

  • messageEncoding: encodage des messages à publier dans le sujet Pub/Sub. Lorsque le schéma du sujet de destination est configuré, l'encodage des messages est déterminé par les paramètres du sujet. Les valeurs suivantes sont acceptées : BINARY et JSON. La valeur par défaut est JSON.
  • messageFormat: encodage des messages à publier dans le sujet Pub/Sub. Lorsque le schéma du sujet de destination est configuré, l'encodage des messages est déterminé par les paramètres du sujet. Les valeurs suivantes sont acceptées : AVRO, PROTOCOL_BUFFERS et JSON. La valeur par défaut est JSON. Lorsque le format JSON est utilisé, les champs "rowKey", "column" et "value" du message sont des chaînes, dont le contenu est déterminé par les options de pipeline useBase64Rowkeys, useBase64ColumnQualifiers, useBase64Values et bigtableChangeStreamCharset.
  • stripValues: lorsque cette valeur est définie sur true, les mutations SET_CELL sont renvoyées sans nouvelle valeur définie. La valeur par défaut est false. Ce paramètre est utile lorsque vous n'avez pas besoin d'une nouvelle valeur (également appelé invalidation de cache) ou lorsque les valeurs sont extrêmement volumineuses et dépassent les limites de taille des messages Pub/Sub.
  • dlqDirectory: répertoire de la file d'attente de lettres mortes. Les enregistrements dont le traitement échoue sont stockés dans ce répertoire. Correspond par défaut à un répertoire situé sous l'emplacement temporaire du job Dataflow. Dans la plupart des cas, vous pouvez utiliser le chemin par défaut.
  • dlqRetryMinutes: nombre de minutes entre les tentatives d'exécution de la file d'attente de lettres mortes. La valeur par défaut est 10.
  • dlqMaxRetries: nombre maximal de tentatives d'exécution de lettres mortes. La valeur par défaut est 5.
  • useBase64Rowkeys: utilisé avec l'encodage de message JSON. Lorsque ce paramètre est défini sur true, le champ rowKey est une chaîne encodée en base64. Sinon, le rowKey est généré à l'aide de bigtableChangeStreamCharset pour décoder les octets en une chaîne. La valeur par défaut est false.
  • pubSubProjectId: ID du projet Bigtable. La valeur par défaut est le projet du job Dataflow.
  • useBase64ColumnQualifiers: utilisé avec l'encodage de message JSON. Lorsque ce paramètre est défini sur true, le champ column est une chaîne encodée en base64. Sinon, la colonne est générée à l'aide de bigtableChangeStreamCharset pour décoder les octets en une chaîne. La valeur par défaut est false.
  • useBase64Values: utilisé avec l'encodage de message JSON. Lorsque ce paramètre est défini sur true, le champ de valeur est une chaîne encodée en base64. Sinon, la valeur est générée à l'aide de bigtableChangeStreamCharset pour décoder les octets en une chaîne. La valeur par défaut est false.
  • disableDlqRetries: indique si les nouvelles tentatives doivent être désactivées pour la file d'attente de lettres mortes. La valeur par défaut est "false".
  • bigtableChangeStreamMetadataInstanceId: ID d'instance de métadonnées du flux de modifications Bigtable. La valeur par défaut est vide.
  • bigtableChangeStreamMetadataTableTableId: ID de la table de métadonnées du connecteur de flux de modifications Bigtable. Si aucune valeur n'est fournie, une table de métadonnées du connecteur de flux de modifications Bigtable est automatiquement créée pendant l'exécution du pipeline. La valeur par défaut est vide.
  • bigtableChangeStreamCharset: nom du charset de flux de modifications Bigtable. La valeur par défaut est UTF8.
  • bigtableChangeStreamStartTimestamp: code temporel de début (https://tools.ietf.org/html/rfc3339), inclusif, à utiliser pour la lecture des flux de modifications. Par exemple, 2022-05-05T07:59:59Z. La valeur par défaut est l'horodatage de l'heure de début du pipeline.
  • bigtableChangeStreamIgnoreColumnFamilies: liste des modifications de noms de familles de colonnes, séparées par une virgule, à ignorer. La valeur par défaut est vide.
  • bigtableChangeStreamIgnoreColumns: liste des modifications de noms de colonnes, séparées par une virgule, à ignorer. La valeur par défaut est vide.
  • bigtableChangeStreamName: nom unique pour le pipeline client. Permet de reprendre le traitement à partir du moment où un pipeline précédemment exécuté s'est arrêté. La valeur par défaut est un nom généré automatiquement. Consultez les journaux du job Dataflow pour connaître la valeur utilisée.
  • bigtableChangeStreamResume: lorsque ce paramètre est défini sur true, un nouveau pipeline reprend le traitement à partir du moment où un pipeline précédemment exécuté avec la même valeur bigtableChangeStreamName s'est arrêté. Si le pipeline avec la valeur bigtableChangeStreamName donnée n'a jamais été exécuté, aucun nouveau pipeline ne démarre. Lorsque ce paramètre est défini sur false, un nouveau pipeline démarre. Si un pipeline avec la même valeur bigtableChangeStreamName a déjà été exécuté pour la source donnée, aucun nouveau pipeline ne démarre. La valeur par défaut est false.
  • bigtableReadProjectId: ID du projet Bigtable. La valeur par défaut est le projet pour le job Dataflow.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Bigtable change streams to Pub/Sub template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
pubSubTopic=PUBSUB_TOPIC

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • BIGTABLE_INSTANCE_ID : ID de votre instance Bigtable.
  • BIGTABLE_TABLE_ID : ID de votre table Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID : ID de votre profil d'application Bigtable.
  • PUBSUB_TOPIC : nom du sujet de destination Pub/Sub.

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "pubSubTopic": "PUBSUB_TOPIC"
    }
  }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • BIGTABLE_INSTANCE_ID : ID de votre instance Bigtable.
  • BIGTABLE_TABLE_ID : ID de votre table Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID : ID de votre profil d'application Bigtable.
  • PUBSUB_TOPIC : nom du sujet de destination Pub/Sub.

Étape suivante