Lire un flux de modifications avec Java

La bibliothèque cliente Cloud Bigtable pour Java fournit des méthodes de bas niveau pour traiter les enregistrements de modifications de données. Toutefois, dans la plupart des cas, nous vous recommandons de diffuser les modifications avec Dataflow au lieu d'utiliser les méthodes décrites sur cette page, car Dataflow gère les fractionnements et les fusions de partitions pour vous.

Avant de commencer

Avant de lire un flux de modifications avec Java, assurez-vous de bien comprendre la présentation des flux de modifications. Ensuite, remplissez les conditions préalables suivantes.

Configurer l'authentification

Pour utiliser les exemples Java de cette page dans un environnement de développement local, installez et initialisez gcloud CLI, puis configurez les Identifiants par défaut de l'application à l'aide de vos identifiants utilisateur.

    Installez Google Cloud CLI.

    Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.

    If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

Pour en savoir plus, consultez Set up authentication for a local development environment.

Pour en savoir plus sur la configuration de l'authentification dans un environnement de production, consultez Set up Application Default Credentials for code running on Google Cloud.

Activer un flux de modifications

Vous devez activer un flux de modifications sur une table avant de pouvoir la lire. Vous pouvez également créer une table avec un flux de modifications activé.

Rôles requis

Pour obtenir les autorisations nécessaires pour lire un flux de modifications Bigtable, demandez à votre administrateur de vous accorder le rôle IAM suivant.

  • Administrateur Bigtable (roles/bigtable.admin) sur l'instance Bigtable qui contient la table à partir de laquelle vous prévoyez de diffuser les modifications

Ajouter la bibliothèque cliente Java en tant que dépendance

Ajoutez un code semblable à celui ci-dessous à votre fichier Maven pom.xml. Remplacez VERSION par la version de la bibliothèque cliente que vous utilisez. La version doit être 2.21.0 ou ultérieure.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Déterminer les partitions de la table

Pour commencer à envoyer des requêtes ReadChangeStream, vous devez connaître les partitions de votre table. Cela peut être déterminé à l'aide de la méthode GenerateInitialChangeStreamPartitions. L'exemple suivant montre comment utiliser cette méthode pour obtenir un flux de ByteStringRanges représentant chaque partition de la table. Chaque ByteStringRange contient la clé de début et de fin d'une partition.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Traiter les modifications pour chaque partition

Vous pouvez ensuite traiter les modifications pour chaque partition à l'aide de la méthode ReadChangeStream. Voici un exemple d'ouverture d'un flux pour une partition, en commençant à l'heure actuelle.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery accepte les arguments suivants :

  • Partition du flux (obligatoire) : partition à partir de laquelle diffuser les modifications
  • Choisissez l'une des options suivantes :
    • Heure de début : code temporel de validation à partir duquel commencer à traiter les modifications
    • Jetons de continuation : jetons représentant une position à partir de laquelle reprendre le streaming
  • Heure de fin (facultatif) : horodatage de commit pour arrêter le traitement des modifications une fois atteint. Si vous ne fournissez pas de valeur, le flux continue d'être lu.
  • Durée du signal de présence (facultatif) : fréquence des messages de signal de présence lorsqu'il n'y a pas de nouveaux changements (par défaut, cinq secondes)

Modifier le format d'enregistrement du flux

Un enregistrement de flux de modifications renvoyé peut être l'un des trois types de réponse suivants :

  • ChangeStreamMutation : message représentant un enregistrement de modification de données.

  • CloseStream : message indiquant que le client doit arrêter la lecture du flux.

    • État : indique le motif de la fermeture du flux. Au choix :
      • L'heure de fin OK a été atteinte pour la partition donnée.
      • OUT_OF_RANGE : la partition indiquée n'existe plus, ce qui signifie qu'elle a été fractionnée ou fusionnée. Vous devrez créer une requête ReadChangeStream pour chaque nouvelle partition.
    • NewPartitions : fournit les informations de partitionnement mises à jour sur les réponses OUT_OF_RANGE.
    • ChangeStreamContinuationTokens : liste des jetons utilisés pour reprendre de nouvelles requêtes ReadChangeStream à partir de la même position. Une par NewPartition.
  • Heartbeat : message périodique contenant des informations pouvant être utilisées pour vérifier l'état du flux.

    • EstimatedLowWatermark : estimation de la limite inférieure pour la partition donnée.
    • ContinuationToken : jeton permettant de reprendre le streaming de la partition donnée à partir de la position actuelle.

Contenu des enregistrements de modification des données

Pour en savoir plus sur les enregistrements de flux de modifications, consultez Contenu d'un enregistrement de modification des données.

Gérer les modifications apportées aux partitions

Lorsque les partitions d'une table changent, les requêtes ReadChangeStream renvoient un message CloseStream contenant les informations nécessaires pour reprendre le streaming à partir de la ou des nouvelles partitions.

Pour une division, cette valeur contiendra plusieurs nouvelles partitions et un ContinuationToken correspondant pour chaque partition. Pour reprendre la diffusion des nouvelles partitions à partir de la même position, vous devez envoyer une nouvelle requête ReadChangeStream pour chaque nouvelle partition avec son jeton correspondant.

Par exemple, si vous diffusez la partition [A,C) et qu'elle est divisée en deux partitions, [A,B) et [B,C), vous pouvez vous attendre à la séquence d'événements suivante :

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Pour reprendre le streaming de chaque partition à partir de la même position, envoyez les requêtes ReadChangeStreamQuery suivantes :

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Pour une fusion, afin de reprendre à partir de la même partition, vous devez envoyer une nouvelle requête ReadChangeStream contenant chaque jeton des partitions fusionnées.

Par exemple, si vous diffusez deux partitions, [A,B) et [B,C), et qu'elles sont fusionnées dans la partition [A,C), vous pouvez vous attendre à la séquence d'événements suivante :

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Pour reprendre le streaming de la partition [A, C) à la même position, envoyez un ReadChangeStreamQuery comme suit :

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

Étapes suivantes