Lire un flux de modifications avec Java
La bibliothèque cliente Cloud Bigtable pour Java fournit des méthodes de bas niveau pour traiter les enregistrements de modification de données. Toutefois, dans la plupart des cas, nous vous recommandons de transmettre les modifications avec Dataflow plutôt que d'utiliser les méthodes décrites sur cette page, car Dataflow gère les partitions et les fusions pour vous.
Avant de commencer
Avant de lire un flux de modifications avec Java, assurez-vous de maîtriser les Présentation des flux de modifications Complétez ensuite les conditions préalables suivantes.
Configurer l'authentification
Pour utiliser les exemples Java de cette page dans un environnement de développement local, installez et initialisez gcloud CLI, puis configurez le service Identifiants par défaut de l'application à l'aide de vos identifiants utilisateur.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create local authentication credentials for your user account:
gcloud auth application-default login
Pour en savoir plus, consultez les sections sur Configurer l'authentification pour un environnement de développement local.
Pour en savoir plus sur la configuration de l'authentification dans un environnement de production, consultez Configurer le service Identifiants par défaut de l'application pour le code exécuté sur Google Cloud.
Activer un flux de modifications
Vous devez activer une modification diffuser en streaming sur une table avant de pouvoir les lire. Vous pouvez également créer une table avec un flux de modifications activé.
Rôles requis
Pour obtenir les autorisations nécessaires pour lire une modification Bigtable flux, demandez à votre administrateur de vous attribuer le rôle IAM suivant.
- Administrateur Bigtable (
roles/bigtable.admin
) sur l'instance Bigtable contenant la table à partir de laquelle vous prévoyez de diffuser des modifications
Ajouter la bibliothèque cliente Java en tant que dépendance
Ajoutez un code semblable au code suivant à votre fichier pom.xml
Maven. Remplacez VERSION
par la version de la bibliothèque cliente que vous utilisez. Vous devez utiliser la version 2.21.0 ou une version ultérieure.
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
Déterminer les partitions de la table
Pour commencer à envoyer des requêtes ReadChangeStream
, vous devez connaître les partitions de votre table. Vous pouvez le déterminer à l'aide de la méthode GenerateInitialChangeStreamPartitions
. L'exemple suivant montre comment
utiliser cette méthode pour obtenir un flux
ByteStringRanges
représentant chaque partition de la table. Chaque ByteStringRange
contient la clé de début et de fin d'une partition.
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
Traiter les modifications pour chaque partition
Vous pouvez ensuite traiter les modifications pour chaque partition à l'aide de la méthode ReadChangeStream
. Cet exemple montre comment ouvrir un flux pour une partition,
à partir de l'heure actuelle.
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery
accepte les arguments suivants:
- Partition par flux (obligatoire) : partition à partir de laquelle diffuser les modifications
- Choisissez l'une des options suivantes :
- Heure de début : code temporel de commit à partir duquel commencer le traitement des modifications
- Jetons de continuation : jetons représentant une position à partir de laquelle reprendre le streaming
- Heure de fin (facultatif) : code temporel de validation pour arrêter le traitement des modifications lorsqu'il est atteint. Si vous ne fournissez pas de valeur, le flux continue de lire.
- Durée du battement de cœur (facultatif) : fréquence des messages de battement de cœur en l'absence de nouveaux changements (par défaut, cinq secondes)
Modifier le format d'enregistrement du flux
Un enregistrement de flux de modifications renvoyé est l'un des trois types de réponses suivants :
ChangeStreamMutation
: message représentant un enregistrement de modification de données.CloseStream
: message indiquant que le client doit arrêter la lecture du flux.- État : indique la raison pour laquelle le flux a été fermé. Au choix :
OK
: l'heure de fin de la partition donnée a été atteinte.OUT_OF_RANGE
: la partition donnée n'existe plus. ce qui signifie que des divisions ou des fusions ont eu lieu sur cette partition. Une nouvelle requêteReadChangeStream
doit être créée pour chaque nouvelle partition.
NewPartitions
: fournit les informations de partitionnement mises à jour sur les réponsesOUT_OF_RANGE
.ChangeStreamContinuationTokens
: liste des jetons utilisés pour la reprise nouvelles demandesReadChangeStream
à partir de la même position. Un parNewPartition
.
- État : indique la raison pour laquelle le flux a été fermé. Au choix :
Heartbeat
: message périodique contenant des informations pouvant être utilisées pour l'état du flux.EstimatedLowWatermark
: estimation du filigrane faible pour le partition donnéeContinuationToken
: jeton permettant de reprendre la diffusion de la partition donnée à partir de la position actuelle.
Contenu des enregistrements de modification des données
Pour en savoir plus sur les enregistrements de flux de modifications, consultez la section Que contient une modification de données ? d'enregistrement.
Gérer les modifications des partitions
Lorsque les partitions d'une table changent, les requêtes ReadChangeStream
renvoient un message CloseStream
contenant les informations requises pour reprendre le streaming à partir de la ou des nouvelles partitions.
Pour une partition, elle contiendra plusieurs nouvelles partitions et un bloc
ContinuationToken
pour chaque partition. Pour reprendre la diffusion des nouvelles partitions
à partir de la même position, vous effectuez une nouvelle requête ReadChangeStream
pour chaque nouvelle partition par son jeton correspondant.
Par exemple, si vous diffusez en continu la partition [A,C)
et qu'elle se divise en deux partitions, [A,B)
et [B,C)
, vous pouvez vous attendre à la séquence d'événements suivante :
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
Pour reprendre la diffusion en continu de chaque partition à partir de la même position, envoyez les requêtes ReadChangeStreamQuery
suivantes :
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
Pour reprendre une fusion à partir d'une même partition, vous devez envoyer une nouvelle
Requête ReadChangeStream
contenant chaque jeton des partitions fusionnées.
Par exemple, si vous diffusez deux partitions, [A,B)
et [B,C)
, et qu'elles
dans la partition [A,C)
, vous pouvez vous attendre à la séquence d'événements suivante:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
Pour reprendre la partition de streaming [A, C)
à partir de la même position, envoyez un ReadChangeStreamQuery
comme suit :
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));