Diffuser des modifications avec Dataflow

Le connecteur Bigtable Beam vous permet d'utiliser Dataflow pour lire les enregistrements de modifications de données Bigtable sans avoir à suivre ni à traiter les modifications de partition dans votre code, car le connecteur gère cette logique pour vous.

Ce document explique comment configurer et utiliser le connecteur Bigtable Beam pour lire un flux de modifications à l'aide d'un pipeline Dataflow. Avant de lire ce document, vous devez avoir lu la présentation des flux de modifications et connaître Dataflow.

Alternatives à la création de votre propre pipeline

Si vous ne souhaitez pas créer votre propre pipeline Dataflow, vous pouvez utiliser l'une des options suivantes.

Vous pouvez utiliser un modèle Dataflow fourni par Google.

Vous pouvez également utiliser les exemples de code du tutoriel ou du démarrage rapide Bigtable comme point de départ pour votre code.

Assurez-vous que le code que vous générez utilise la version 26.14.0 ou ultérieure de google cloud libraries-bom.

Informations sur le connecteur

La méthode de connecteur Beam Bigtable, BigtableIO.readChangeStream, vous permet de lire un flux d'enregistrements de modifications de données (ChangeStreamMutation) que vous pouvez traiter. Le connecteur Bigtable Beam est un composant du dépôt GitHub Apache Beam. Pour obtenir une description du code du connecteur, consultez les commentaires sur BigtableIO.java.

Vous devez utiliser le connecteur avec Beam version 2.48.0 ou ultérieure. Consultez la page Compatibilité des environnements d'exécution Apache Beam pour vous assurer d'utiliser une version compatible de Java. Vous pouvez ensuite déployer un pipeline qui utilise le connecteur vers Dataflow, qui gère le provisionnement et la gestion des ressources, et contribue à la scalabilité et à la fiabilité du traitement des données de flux.

Pour en savoir plus sur le modèle de programmation Apache Beam, consultez la documentation Beam.

Regrouper des données sans heure d'événement

Les enregistrements de modifications de données diffusés en streaming à l'aide du connecteur Bigtable Beam ne sont pas compatibles avec les fonctions Dataflow qui dépendent des heures d'événement.

Comme expliqué dans Réplication et filigranes, un filigrane bas peut ne pas progresser si la réplication de la partition n'a pas rattrapé le reste de l'instance. Lorsqu'un watermark bas cesse de progresser, le flux de modifications peut se bloquer.

Pour éviter que le flux ne s'arrête, le connecteur Bigtable Beam génère toutes les données avec un code temporel de sortie égal à zéro. L'horodatage zéro permet à Dataflow de considérer tous les enregistrements de modification des données comme des données tardives. Par conséquent, les fonctionnalités Dataflow qui dépendent des heures d'événement ne sont pas compatibles avec les flux de modifications Bigtable. Plus précisément, vous ne pouvez pas utiliser les fonctions de fenêtrage, les déclencheurs basés sur l'heure de l'événement ni les timers basés sur l'heure de l'événement.

À la place, vous pouvez utiliser GlobalWindows avec des déclencheurs non liés à l'heure de l'événement pour regrouper ces données tardives dans des volets, comme illustré dans l'exemple du tutoriel. Pour en savoir plus sur les déclencheurs et les volets, consultez Déclencheurs dans le guide de programmation Beam.

Autoscaling

Le connecteur est compatible avec l'autoscaling Dataflow, qui est activé par défaut lorsque vous utilisez Runner v2 (obligatoire). L'algorithme d'autoscaling Dataflow prend en compte le backlog estimé du flux de modifications, qui peut être surveillé sur la page Surveillance Dataflow de la section Backlog. Utilisez l'option --maxNumWorkers lorsque vous déployez un job pour limiter le nombre de nœuds de calcul.

Pour effectuer le scaling manuel de votre pipeline au lieu d'utiliser l'autoscaling, consultez Effectuer le scaling manuel d'un pipeline de traitement en flux continu.

Limites

Avant d'utiliser le connecteur Bigtable Beam avec Dataflow, tenez compte des limites suivantes.

Exécuteur Dataflow v2

Le connecteur ne peut être exécuté qu'à l'aide de Dataflow Runner v2. Pour l'activer, spécifiez --experiments=use_runner_v2 dans vos arguments de ligne de commande. L'exécution avec l'exécuteur v1 entraîne l'échec de votre pipeline avec l'exception suivante :

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Instantanés

Le connecteur n'est pas compatible avec les instantanés Dataflow.

Doublons

Le connecteur Bigtable Beam diffuse les modifications pour chaque clé de ligne et chaque cluster dans l'ordre des codes temporels de validation. Toutefois, comme il redémarre parfois à partir d'heures antérieures dans le flux, il peut générer des doublons.

Redémarrages de pipeline

Si un pipeline Dataflow est arrêté depuis longtemps, les enregistrements de modifications de données peuvent être en retard par rapport à la limite de conservation. Lorsque le pipeline est repris, Bigtable le fait échouer afin que vous puissiez en démarrer un nouveau avec une nouvelle heure de début de requête qui se trouve dans la période de conservation. Bigtable procède ainsi, au lieu d'avancer silencieusement l'heure de la requête du pipeline d'origine, pour éviter la suppression involontaire des enregistrements de modification des données dont les codes temporels se situent en dehors de la période de conservation spécifiée.

Avant de commencer

Avant d'utiliser le connecteur, remplissez les conditions préalables suivantes.

Configurer l'authentification

Pour utiliser les exemples Java de cette page dans un environnement de développement local, installez et initialisez gcloud CLI, puis configurez les Identifiants par défaut de l'application à l'aide de vos identifiants utilisateur.

    Installez Google Cloud CLI.

    Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.

    If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

Pour en savoir plus, consultez Set up authentication for a local development environment.

Pour en savoir plus sur la configuration de l'authentification dans un environnement de production, consultez Set up Application Default Credentials for code running on Google Cloud.

Activer un flux de modifications

Vous devez activer un flux de modifications sur une table avant de pouvoir la lire. Vous pouvez également créer une table avec les flux de modifications activés.

Modifier la table des métadonnées du flux

Lorsque vous diffusez des modifications avec Dataflow, le connecteur Bigtable Beam crée une table de métadonnées nommée __change_stream_md_table par défaut. La table de métadonnées du flux de modifications gère l'état opérationnel du connecteur et stocke les métadonnées sur les enregistrements de modifications de données.

Par défaut, le connecteur crée la table dans la même instance que la table en cours de streaming. Pour que la table fonctionne correctement, le profil d'application de la table de métadonnées doit utiliser le routage à cluster unique et avoir activé les transactions à ligne unique.

Pour en savoir plus sur le streaming des modifications depuis Bigtable avec le connecteur Bigtable Beam, consultez la documentation BigtableIO.

Rôles requis

Pour obtenir les autorisations nécessaires pour lire un flux de modifications Bigtable à l'aide de Dataflow, demandez à votre administrateur de vous accorder les rôles IAM suivants.

Pour lire les modifications apportées à Bigtable, vous avez besoin du rôle suivant :

  • Administrateur Bigtable (roles/bigtable.admin) sur l'instance Bigtable contenant la table à partir de laquelle vous prévoyez de diffuser les modifications

Pour exécuter le job Dataflow, vous avez besoin des rôles suivants :

Pour en savoir plus sur l'attribution de rôles, consultez la section Gérer les accès.

Vous pouvez également obtenir les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.

Ajouter le connecteur Bigtable Beam en tant que dépendance

Ajoutez à votre fichier Maven pom.xml un code semblable à la dépendance suivante. Vous devez disposer de la version 2.48.0 ou ultérieure.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Lire le flux de modifications

Pour créer un pipeline Dataflow permettant de lire vos enregistrements de modifications de données, vous devez configurer le connecteur, puis ajouter des transformations et des récepteurs. Vous utilisez ensuite le connecteur pour lire les objets ChangeStreamMutation dans un pipeline Beam.

Les exemples de code de cette section, écrits en Java, montrent comment créer un pipeline et l'utiliser pour convertir des paires clé/valeur en chaîne. Chaque paire se compose d'une clé de ligne et d'un objet ChangeStreamMutation. Le pipeline convertit les entrées de chaque objet en chaîne séparée par des virgules.

Créer le pipeline

Cet exemple de code Java montre comment créer le pipeline :

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Traiter les enregistrements de modification des données

Cet exemple montre comment parcourir toutes les entrées d'un enregistrement de modification de données pour une ligne et appeler une méthode de conversion en chaîne en fonction du type d'entrée.

Pour obtenir la liste des types d'entrées qu'un enregistrement de modification des données peut contenir, consultez Contenu d'un enregistrement de modification des données.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

Dans cet exemple, une entrée write est convertie :

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

Dans cet exemple, une entrée Suppression de cellules est convertie :

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

Dans cet exemple, une entrée suppression d'une famille de colonnes est convertie :


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Surveiller

Les ressources suivantes de la console Google Cloud vous permettent de surveiller vos ressourcesGoogle Cloud lorsque vous exécutez un pipeline Dataflow pour lire un flux de modifications Bigtable :

Vérifiez notamment les métriques suivantes :

  • Sur la page "Insights système Bigtable", vérifiez les métriques suivantes :
    • Données Utilisation du processeur par flux de modifications dans la métrique cpu_load_by_app_profile_by_method_by_table. Affiche l'impact du flux de modifications sur l'utilisation du processeur de votre cluster.
    • Flux de modifications de l'utilisation du stockage (octets) (change_stream_log_used_bytes).
  • Sur la page de surveillance Dataflow, vérifiez la fraîcheur des données. Cette métrique indique la différence entre l'heure actuelle et le filigrane (environ deux minutes), avec des pics occasionnels d'une ou deux minutes de plus. La fraîcheur des données n'indique pas si les enregistrements de modifications de données sont traités lentement. Pour assurer l'état et les performances continues de vos applications critiques, surveillez la métrique de fraîcheur des données Dataflow et effectuez les actions suivantes :

    • Si la métrique de fraîcheur des données est systématiquement supérieure au seuil, il est possible que votre pipeline manque de ressources. Nous vous recommandons d'ajouter des nœuds de calcul Dataflow.
    • Si les nœuds de calcul Dataflow sont correctement provisionnés, mais que la fraîcheur des données augmente ou est constamment élevée, contactez l'assistanceGoogle Cloud .
  • La métrique processing_delay_from_commit_timestamp_MEAN de Dataflow peut vous indiquer le temps de traitement moyen des enregistrements de modification des données pendant toute la durée de vie du job.

La métrique server/latencies Bigtable n'est pas utile lorsque vous surveillez un pipeline Dataflow qui lit un flux de modifications Bigtable, car elle reflète la durée de la requête de flux continu, et non la latence de traitement des enregistrements de modifications des données. Une latence élevée dans un flux de modifications ne signifie pas que les requêtes sont traitées lentement, mais que la connexion est restée ouverte pendant cette durée.

Étapes suivantes