Diffuser les modifications avec Dataflow

Le connecteur Bigtable Beam vous permet d'utiliser Dataflow pour lire les enregistrements de modifications de données Bigtable sans avoir à suivre ni traiter les modifications de partition dans votre code, car le connecteur gère cette logique pour vous.

Ce document explique comment configurer et utiliser le connecteur Bigtable Beam pour lire un flux de modifications à l'aide d'un pipeline Dataflow. Avant de lire ce document, consultez la présentation des flux de modifications et familiarisez-vous avec Dataflow.

Alternatives à la création de votre propre pipeline

Si vous ne souhaitez pas créer votre propre pipeline Dataflow, vous pouvez utiliser l'une des options suivantes.

Vous pouvez utiliser un modèle Dataflow fourni par Google.

Vous pouvez également utiliser les exemples de code du tutoriel ou du démarrage rapide Bigtable comme point de départ de votre code.

Assurez-vous que le code que vous générez utilise google cloud libraries-bom version 26.14.0 ou ultérieure.

Détails du connecteur

La méthode du connecteur Bigtable Beam, BigtableIO.readChangeStream, vous permet de lire un flux d'enregistrements de modifications de données (ChangeStreamMutation) que vous pouvez traiter. Le connecteur Bigtable Beam est un composant du dépôt GitHub d'Apache Beam. Pour obtenir une description du code de connecteur, consultez les commentaires sur la page BigtableIO.java.

Vous devez utiliser le connecteur avec Beam 2.48.0 ou version ultérieure. Vérifiez la compatibilité de l'environnement d'exécution Apache Beam pour vous assurer que vous utilisez une version compatible de Java. Vous pouvez ensuite déployer un pipeline utilisant le connecteur à Dataflow, qui gère le provisionnement et la gestion des ressources, et améliore l'évolutivité et la fiabilité du traitement des données par flux.

Pour en savoir plus sur le modèle de programmation Apache Beam, consultez la documentation de Beam.

Regrouper des données sans heure d'événement

Les enregistrements de modification de données diffusés à l'aide du connecteur Bigtable Beam ne sont pas compatibles avec les fonctions Dataflow qui dépendent de l'heure des événements.

Comme expliqué dans la section Réplication et filigranes, un filigrane faible peut ne pas progresser si la réplication pour la partition n'a pas rattrapé le reste de l'instance. Lorsqu'un filigrane faible cesse d'avancer, le flux de modifications peut se bloquer.

Pour éviter que le flux ne se bloque, le connecteur Bigtable Beam génère toutes les données avec un horodatage de sortie égal à zéro. L'horodatage zéro fait en sorte que Dataflow considère tous les enregistrements de modifications de données comme des données tardives. Par conséquent, les fonctionnalités Dataflow qui dépendent de l'heure des événements ne sont pas compatibles avec les flux de modifications Bigtable. Plus précisément, vous ne pouvez pas utiliser de fonctions de fenêtrage, de déclencheurs basés sur l'heure de l'événement ni de minuteurs à l'heure de l'événement.

À la place, vous pouvez utiliser GlobalWindows avec des déclencheurs temporels sans événement pour regrouper ces données tardives dans des volets, comme illustré dans l'exemple du tutoriel. Pour en savoir plus sur les déclencheurs et les volets, consultez la section Déclencheurs du guide de programmation Beam.

Autoscaling

Le connecteur est compatible avec l'autoscaling Dataflow, qui est activé par défaut lorsque vous utilisez l'exécuteur v2 (obligatoire). L'algorithme d'autoscaling de Dataflow tient compte du nombre estimé de flux de modifications en attente, qui peut être surveillé sur la page Surveillance de Dataflow, dans la section Backlog. Utilisez l'option --maxNumWorkers lors du déploiement d'une tâche pour limiter le nombre de nœuds de calcul.

Pour effectuer le scaling manuel de votre pipeline au lieu d'utiliser l'autoscaling, consultez la page Scaling manuel d'un pipeline de flux de données.

Limites

Notez les limites suivantes avant d'utiliser le connecteur Bigtable Beam avec Dataflow.

Exécuteur Dataflow V2

Le connecteur ne peut être exécuté qu'à l'aide de l'exécuteur Dataflow v2. Pour ce faire, spécifiez --experiments=use_runner_v2 dans vos arguments de ligne de commande. L'exécution avec l'exécuteur v1 entraîne l'échec de votre pipeline et l'exception suivante:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Instantanés

Le connecteur n'est pas compatible avec les instantanés Dataflow.

Doublons

Le connecteur Bigtable Beam diffuse les modifications pour chaque clé de ligne et chaque cluster dans l'ordre d'horodatage de commit, mais, comme il redémarre parfois à des moments antérieurs dans le flux, il peut produire des doublons.

Avant de commencer

Avant d'utiliser le connecteur, remplissez les conditions préalables suivantes.

Configurer l'authentification

Pour utiliser les exemples Java de cette page dans un environnement de développement local, installez et initialisez gcloud CLI, puis configurez le service Identifiants par défaut de l'application à l'aide de vos identifiants utilisateur.

  1. Installez Google Cloud CLI.
  2. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  3. Créez des identifiants d'authentification locaux pour votre compte Google :

    gcloud auth application-default login

Pour en savoir plus, consultez les sections sur Configurer l'authentification pour un environnement de développement local.

Pour en savoir plus sur la configuration de l'authentification dans un environnement de production, consultez Configurer le service Identifiants par défaut de l'application pour le code exécuté sur Google Cloud.

Activer un flux de modifications

Vous devez activer un flux de modifications sur une table pour pouvoir la lire. Vous pouvez également créer une table avec les flux de modifications activés.

Rôles requis

Pour obtenir les autorisations dont vous avez besoin pour lire un flux de modifications Bigtable à l'aide de Dataflow, demandez à votre administrateur de vous accorder les rôles IAM suivants.

Pour lire les modifications dans Bigtable, vous avez besoin du rôle suivant:

  • Administrateur Bigtable (roles/bigtable.admin) sur l'instance Bigtable contenant la table à partir de laquelle vous prévoyez de diffuser les modifications

Pour exécuter la tâche Dataflow, vous devez disposer des rôles suivants:

Pour en savoir plus sur l'attribution de rôles, consultez la section Gérer les accès.

Vous pouvez également obtenir les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.

Ajouter le connecteur Bigtable Beam en tant que dépendance

Ajoutez un code semblable à la dépendance suivante à votre fichier Maven pom.xml. Il doit s'agir de la version 2.48.0 ou ultérieure.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Lire le flux de modifications

Pour créer un pipeline Dataflow permettant de lire vos enregistrements de modification de données, vous devez configurer le connecteur, puis ajouter des transformations et des récepteurs. Vous utiliserez ensuite le connecteur pour lire les objets ChangeStreamMutation dans un pipeline Beam.

Les exemples de code de cette section, écrits en Java, montrent comment créer un pipeline et l'utiliser pour convertir des paires clé/valeur en chaîne. Chaque paire se compose d'une clé de ligne et d'un objet ChangeStreamMutation. Le pipeline convertit les entrées de chaque objet en une chaîne séparée par des virgules.

Créer le pipeline

Cet exemple de code Java montre comment créer le pipeline:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Traiter les enregistrements de modifications de données

Cet exemple montre comment lire en boucle toutes les entrées d'un enregistrement de modification de données pour une ligne et comment appeler une méthode de conversion en chaîne basée sur le type d'entrée.

Pour obtenir la liste des types d'entrées qu'un enregistrement de modification de données peut contenir, consultez la section Contenu d'un enregistrement de modification des données.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

Dans cet exemple, une entrée write est convertie:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

Dans cet exemple, une entrée de suppression de cellules est convertie:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

Dans cet exemple, une suppression d'une famille de colonnes est convertie:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Surveillance

Les ressources suivantes de la console Google Cloud vous permettent de les surveiller pendant que vous exécutez un pipeline Dataflow afin de lire un flux de modifications Bigtable:

Vérifiez en particulier les métriques suivantes:

  • Sur la page Monitoring de Bigtable, vérifiez les metrics suivantes :
    • Les données de l'utilisation du processeur par flux de modifications dans la métrique cpu_load_by_app_profile_by_method_by_table. Affiche l'impact du flux de modifications sur l'utilisation du processeur de votre cluster.
    • Utilisation du stockage par flux de modifications (octets) (change_stream_log_used_bytes).
  • Sur la page de surveillance Dataflow, vérifiez la fraîcheur des données, qui indique la différence entre l'heure actuelle et le filigrane. Elle devrait durer environ deux minutes, avec des pics occasionnels d'une minute ou deux de plus. Si la métrique de fraîcheur des données est systématiquement supérieure à ce seuil, votre pipeline ne dispose probablement pas de suffisamment de ressources. Vous devez donc ajouter davantage de nœuds de calcul Dataflow.

Étapes suivantes