Lire un flux de modifications avec Java

La bibliothèque cliente Cloud Bigtable pour Java fournit des méthodes de bas niveau pour traiter les enregistrements de modifications de données. Toutefois, dans la plupart des cas, nous vous recommandons de diffuser les modifications en flux continu avec Dataflow au lieu d'utiliser les méthodes décrites sur cette page, car Dataflow gère les divisions et les fusions de partitions à votre place.

Avant de commencer

Avant de lire un flux de modifications avec Java, assurez-vous de vous familiariser avec la présentation des flux de modifications. Remplissez ensuite les conditions préalables suivantes.

Configurer l'authentification

Pour utiliser les exemples Java de cette page dans un environnement de développement local, installez et initialisez gcloud CLI, puis configurez le service Identifiants par défaut de l'application à l'aide de vos identifiants utilisateur.

  1. Installez Google Cloud CLI.
  2. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  3. Créez des identifiants d'authentification locaux pour votre compte Google :

    gcloud auth application-default login

Pour en savoir plus, consultez les sections sur Configurer l'authentification pour un environnement de développement local.

Pour en savoir plus sur la configuration de l'authentification dans un environnement de production, consultez Configurer le service Identifiants par défaut de l'application pour le code exécuté sur Google Cloud.

Activer un flux de modifications

Vous devez activer un flux de modifications sur une table pour pouvoir la lire. Vous pouvez également créer une table avec un flux de modifications activé.

Rôles requis

Pour obtenir les autorisations nécessaires pour lire un flux de modifications Bigtable, demandez à votre administrateur de vous attribuer le rôle IAM suivant.

  • Administrateur Bigtable (roles/bigtable.admin) sur l'instance Bigtable contenant la table à partir de laquelle vous prévoyez de diffuser les modifications

Ajouter la bibliothèque cliente Java en tant que dépendance

Ajoutez un code semblable à ce qui suit à votre fichier Maven pom.xml. Remplacez VERSION par la version de la bibliothèque cliente que vous utilisez. Il doit s'agir de la version 2.21.0 ou ultérieure.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Déterminer les partitions de la table

Pour commencer à effectuer des requêtes ReadChangeStream, vous devez connaître les partitions de votre table. Pour ce faire, utilisez la méthode GenerateInitialChangeStreamPartitions. L'exemple suivant montre comment utiliser cette méthode pour obtenir un flux de ByteStringRanges représentant chaque partition de la table. Chaque ByteStringRange contient la clé de début et de fin d'une partition.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Traiter les modifications pour chaque partition

Vous pouvez ensuite traiter les modifications pour chaque partition à l'aide de la méthode ReadChangeStream. Voici un exemple d'ouverture d'un flux pour une partition, à partir de l'heure actuelle.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery accepte les arguments suivants:

  • Partition en flux continu (obligatoire) : la partition à partir de laquelle diffuser les modifications.
  • Choisissez l'une des options suivantes :
    • Heure de début : horodatage de commit à partir duquel le traitement des modifications commence
    • Jetons de continuité, qui représentent une position à partir de laquelle la diffusion doit reprendre
  • Heure de fin (facultatif) : validez l'horodatage pour arrêter de traiter les modifications lorsqu'elles sont atteintes. Si vous n'indiquez aucune valeur, le flux poursuit la lecture.
  • Durée des pulsations (facultatif) : fréquence des messages de pulsation en l'absence de nouvelles modifications (valeur par défaut : cinq secondes)

Modifier le format d'enregistrement du flux

Un enregistrement de flux de modifications renvoyé est de l'un des trois types de réponses suivants:

  • ChangeStreamMutation : message représentant un enregistrement de modifications de données.

  • CloseStream : message indiquant que le client doit arrêter la lecture du flux.

    • État : indique le motif de la fermeture du flux. Au choix :
      • OK : l'heure de fin a été atteinte pour la partition donnée.
      • OUT_OF_RANGE : la partition donnée n'existe plus, ce qui signifie que des fractionnements ou des fusions ont eu lieu sur cette partition. Une requête ReadChangeStream doit être créée pour chaque nouvelle partition.
    • NewPartitions : fournit les informations de partitionnement mises à jour sur les réponses OUT_OF_RANGE.
    • ChangeStreamContinuationTokens : liste des jetons utilisés pour reprendre de nouvelles requêtes ReadChangeStream à partir de la même position. Un par NewPartition.
  • Heartbeat : message périodique contenant des informations permettant de vérifier l'état du flux.

    • EstimatedLowWatermark : estimation du filigrane faible pour la partition donnée
    • ContinuationToken : jeton permettant de reprendre la diffusion de la partition donnée à partir de sa position actuelle.

Contenu de l'enregistrement de modification des données

Pour en savoir plus sur les enregistrements de flux de modifications, consultez la section Contenu d'un enregistrement de modification des données.

Gérer les modifications dans les partitions

Lorsque les partitions d'une table changent, les requêtes ReadChangeStream renvoient un message CloseStream contenant les informations requises pour reprendre la diffusion à partir des nouvelles partitions.

Pour une partition, il contient plusieurs nouvelles partitions et une classe ContinuationToken correspondante pour chaque partition. Pour reprendre la diffusion des nouvelles partitions à partir de la même position, envoyez une nouvelle requête ReadChangeStream pour chaque nouvelle partition avec le jeton correspondant.

Par exemple, si vous diffusez la partition [A,C) et qu'elle est divisée en deux partitions, [A,B) et [B,C), vous pouvez vous attendre à la séquence d'événements suivante:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Pour reprendre la diffusion de chaque partition à partir de la même position, envoyez les requêtes ReadChangeStreamQuery suivantes:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Pour une fusion, pour reprendre à partir de la même partition, vous devez envoyer une nouvelle requête ReadChangeStream contenant chaque jeton des partitions fusionnées.

Par exemple, si vous diffusez deux partitions, [A,B) et [B,C), et qu'elles fusionnent dans la partition [A,C), vous pouvez vous attendre à la séquence d'événements suivante:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Pour reprendre la partition d'insertion en flux continu [A, C) à partir de la même position, envoyez un message ReadChangeStreamQuery semblable à celui-ci:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

Étapes suivantes