Créer des connexions de flux de modifications à l'aide de Dataflow

Cette page explique comment créer des pipelines Dataflow qui consomment et transfèrent des données de modification Spanner à l'aide de flux de modifications. Vous pouvez utiliser l'exemple de code de cette page pour créer des pipelines personnalisés.

Concepts fondamentaux

Voici quelques concepts clés des pipelines Dataflow pour les flux de modifications.

Dataflow

Dataflow est un service sans serveur, rapide et économique qui prend en charge le traitement par flux et par lot. Il offre la portabilité avec les tâches de traitement écrites à l'aide des bibliothèques Open Source Apache Beam et automatise le provisionnement de l'infrastructure et la gestion des clusters. Dataflow fournit un flux en quasi-temps réel lors de la lecture à partir de flux de modifications.

Vous pouvez utiliser Dataflow pour consommer des flux de modification Spanner avec le connecteur SpannerIO, qui offre une abstraction sur l'API Spanner pour interroger les flux de modification. Avec ce connecteur, vous n'avez pas besoin de gérer le cycle de vie des partitions de flux de modifications, ce qui est nécessaire lorsque vous utilisez directement l'API Spanner. Le connecteur vous fournit un flux d'enregistrements de modification de données afin que vous puissiez vous concentrer davantage sur la logique de l'application, et moins sur les détails spécifiques de l'API et la partitionnement dynamique du flux de modifications. Dans la plupart des cas où vous devez lire des données de flux de modifications, nous vous recommandons d'utiliser le connecteur SpannerIO plutôt que l'API Spanner.

Les modèles Dataflow sont des pipelines Dataflow prédéfinis qui implémentent des cas d'utilisation courants. Pour en savoir plus, consultez la page Modèles Dataflow.

Pipeline Dataflow

Un pipeline Dataflow de flux de modifications Spanner se compose de quatre parties principales:

  1. Une base de données Spanner avec un flux de modifications
  2. Connecteur SpannerIO
  3. Transforms et sinks définis par l'utilisateur
  4. Un enregistreur d'E/S de l'émetteur Apache Beam

image

Flux de modifications Spanner

Pour savoir comment créer un flux de modifications, consultez la section Créer un flux de modifications.

Connecteur Apache Beam SpannerIO

Il s'agit du connecteur SpannerIO décrit dans la section Dataflow précédente. Il s'agit d'un connecteur d'E/S source qui émet un PCollection d'enregistrements de modification de données aux étapes ultérieures du pipeline. L'heure de l'événement pour chaque enregistrement de modification de données émis sera le code temporel de la validation. Notez que les enregistrements émis sont non triés et que le connecteur SpannerIO garantit qu'il n'y aura pas d'enregistrements tardifs.

Lorsque vous travaillez avec des flux de modifications, Dataflow utilise le point de contrôle. Par conséquent, chaque nœud de calcul peut attendre jusqu'à l'intervalle de point de contrôle configuré pour la mise en mémoire tampon des modifications avant de les envoyer pour un traitement ultérieur.

Transformations définies par l'utilisateur

Une transformation définie par l'utilisateur permet à l'utilisateur d'agréger, de transformer ou de modifier les données de traitement dans un pipeline Dataflow. Il s'agit par exemple de supprimer des informations permettant d'identifier personnellement l'utilisateur, de répondre aux exigences de format de données en aval et de trier les données. Consultez la documentation officielle d'Apache Beam pour le guide de programmation sur les transformations.

Écrivain d'E/S de l'émetteur Apache Beam

Apache Beam contient des connecteurs d'E/S intégrés qui peuvent être utilisés pour écrire à partir d'un pipeline Dataflow dans un récepteur de données tel que BigQuery. La plupart des points de terminaison de données courants sont compatibles en mode natif.

Modèles Dataflow

Les modèles Dataflow permettent de créer des tâches Dataflow basées sur des images Docker prédéfinies pour des cas d'utilisation courants à l'aide de la console Google Cloud, de la CLI Google Cloud ou des appels d'API REST.

Pour les flux de modifications Spanner, nous proposons trois modèles Flex Dataflow:

Définir des autorisations IAM pour les modèles Dataflow

Avant de créer une tâche Dataflow avec les trois modèles flex listés, assurez-vous de disposer des autorisations IAM requises pour les comptes de service suivants:

Si vous ne disposez pas des autorisations IAM requises, vous devez spécifier un compte de service de nœud de calcul géré par l'utilisateur pour créer la tâche Dataflow. Pour en savoir plus, consultez la section Sécurité et autorisations pour Dataflow.

Lorsque vous essayez d'exécuter une tâche à partir d'un modèle Flex Dataflow sans toutes les autorisations requises, votre tâche peut échouer avec une erreur de lecture du fichier de résultats ou une erreur d'autorisation refusée sur la ressource. Pour en savoir plus, consultez la section Résoudre les problèmes liés aux modèles Flex.

Créer un pipeline Dataflow

Cette section décrit la configuration initiale du connecteur et fournit des exemples d'intégrations courantes avec la fonctionnalité de flux de modifications Spanner.

Pour suivre ces étapes, vous avez besoin d'un environnement de développement Java pour Dataflow. Pour en savoir plus, consultez la page Créer un pipeline Dataflow à l'aide de Java.

Créer un flux de modifications

Pour savoir comment créer un flux de modifications, consultez la section Créer un flux de modifications. Pour passer aux étapes suivantes, vous devez disposer d'une base de données Spanner avec un flux de modifications configuré.

Accorder des droits de contrôle précis des accès

Si vous prévoyez que des utilisateurs de contrôle des accès ultraprécis exécuteront la tâche Dataflow, assurez-vous qu'ils sont autorisés à accéder à un rôle de base de données disposant du droit SELECT sur le flux de modifications et du droit EXECUTE sur la fonction de valeur de table du flux de modifications. Assurez-vous également que le principal spécifie le rôle de base de données dans la configuration SpannerIO ou dans le modèle Flex Dataflow.

Pour en savoir plus, consultez la page À propos du contrôle des accès ultraprécis.

Ajouter le connecteur SpannerIO en tant que dépendance

Le connecteur Apache Beam SpannerIO encapsule la complexité de la consommation des flux de modification directement à l'aide de l'API Cloud Spanner, en émettant une PCollection d'enregistrements de données de flux de modification aux étapes ultérieures du pipeline.

Ces objets peuvent être consommés à d'autres étapes du pipeline Dataflow de l'utilisateur. L'intégration du flux de modifications fait partie du connecteur SpannerIO. Pour pouvoir utiliser le connecteur SpannerIO, vous devez ajouter la dépendance à votre fichier pom.xml:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

Créer une base de données de métadonnées

Le connecteur doit suivre chaque partition lors de l'exécution du pipeline Apache Beam. Il conserve ces métadonnées dans une table Spanner créée par le connecteur lors de l'initialisation. Vous devez spécifier la base de données dans laquelle cette table sera créée lors de la configuration du connecteur.

Comme décrit dans les bonnes pratiques concernant les flux de modifications, nous vous recommandons de créer une base de données à cette fin plutôt que d'autoriser le connecteur à utiliser la base de données de votre application pour stocker sa table de métadonnées.

Le propriétaire d'une tâche Dataflow qui utilise le connecteur SpannerIO doit disposer des autorisations IAM suivantes définies avec cette base de données de métadonnées:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Configurer le connecteur

Le connecteur de flux de modifications Spanner peut être configuré comme suit:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

Vous trouverez ci-dessous une description des options readChangeStream():

Configuration Spanner (obligatoire)

Permet de configurer le projet, l'instance et la base de données dans lesquels le flux de modifications a été créé et à partir desquels il doit être interrogé. Spécifie également le rôle de base de données à utiliser lorsque le compte principal IAM qui exécute l'ordre Dataflow est un utilisateur de contrôle d'accès précis. La tâche assume ce rôle de base de données pour accéder au flux de modifications. Pour en savoir plus, consultez la page À propos du contrôle des accès ultraprécis.

Nom du flux de modifications (obligatoire)

Ce nom identifie de manière unique le flux de modifications. Le nom doit être identique à celui utilisé lors de sa création.

ID d'instance de métadonnées (facultatif)

Il s'agit de l'instance permettant de stocker les métadonnées utilisées par le connecteur pour contrôler la consommation des données de l'API de flux de modifications.

ID de la base de données de métadonnées (obligatoire)

Il s'agit de la base de données qui stocke les métadonnées utilisées par le connecteur pour contrôler la consommation des données de l'API de flux de modification.

Nom de la table de métadonnées (facultatif)

Cette option ne doit être utilisée que pour mettre à jour un pipeline existant.

Il s'agit du nom de la table de métadonnées préexistante à utiliser par le connecteur. Le connecteur l'utilise pour stocker les métadonnées afin de contrôler la consommation des données de l'API de flux de modification. Si cette option est omise, Spanner crée une table avec un nom généré lors de l'initialisation du connecteur.

Priorité de la requête RPC (facultatif)

Priorité de la requête à utiliser pour les requêtes de flux de modifications. Si ce paramètre est omis, high priority sera utilisé.

InclusiveStartAt (obligatoire)

Les modifications apportées à partir du code temporel donné sont renvoyées à l'appelant.

InclusiveEndAt (facultatif)

Les modifications jusqu'au code temporel donné sont renvoyées à l'appelant. Si ce paramètre est omis, les modifications seront émises indéfiniment.

Ajouter des transformations et des sinks pour traiter les données de modification

Une fois les étapes précédentes terminées, le connecteur SpannerIO configuré est prêt à émettre une PCollection d'objets DataChangeRecord. Consultez Exemples de transformations et de sinks pour découvrir plusieurs exemples de configurations de pipeline qui traitent ces données en streaming de différentes manières.

Notez que les enregistrements de flux de modifications émis par le connecteur SpannerIO ne sont pas triés. En effet, les PCollections ne garantissent aucun ordre. Si vous avez besoin d'un flux ordonné, vous devez regrouper et trier les enregistrements en tant que transformations dans vos pipelines: consultez Exemple: Tri par clé. Vous pouvez étendre cet exemple pour trier les enregistrements en fonction de n'importe quel champ, par exemple en fonction des ID de transaction.

Exemples de transformations et de récepteurs

Vous pouvez définir vos propres transformations et spécifier des récepteurs dans lesquels écrire les données. La documentation Apache Beam fournit une multitude de transformations pouvant être appliquées, ainsi que des connecteurs d'E/S prêts à l'emploi pour écrire les données dans des systèmes externes.

Exemple: Trier par clé

Cet exemple de code émet des enregistrements de modification de données triés par code temporel de commit et regroupés par clés primaires à l'aide du connecteur Dataflow.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

Cet exemple de code utilise des états et des minuteurs pour mettre en mémoire tampon les enregistrements pour chaque clé, et définit l'heure d'expiration du minuteur sur une heure T configurée par l'utilisateur à l'avenir (définie dans la fonction BufferKeyUntilOutputTimestamp). Lorsque le filigrane Dataflow dépasse l'heure T, ce code vide tous les enregistrements du tampon dont le code temporel est inférieur à T, les trie par code temporel de validation et produit une paire clé-valeur où:

  • La clé est la clé d'entrée, c'est-à-dire la clé primaire hachée dans un tableau de buckets de 1 000 éléments.
  • La valeur correspond aux enregistrements de modification de données triés qui ont été mis en mémoire tampon pour la clé.

Pour chaque clé, nous garantissons les éléments suivants:

  • Les minuteurs se déclenchent toujours dans l'ordre de l'horodatage d'expiration.
  • Les étapes en aval reçoivent toujours les éléments dans l'ordre dans lequel ils ont été produits.

Par exemple, avec une clé de valeur 100, le minuteur se déclenche à T1 et T10 respectivement, produisant un lot d'enregistrements de modification de données à chaque code temporel. Étant donné que les enregistrements de modification de données générés à T1 ont été créés avant ceux générés à T10, il est également garanti que les enregistrements de modification de données générés à T1 seront reçus par l'étape suivante avant ceux générés à T10. Ce mécanisme nous permet de garantir un ordre strict des horodatages de validation par clé primaire pour le traitement en aval.

Ce processus se répète jusqu'à la fin du pipeline et au traitement de tous les enregistrements de modification de données (ou indéfiniment si aucune heure de fin n'est spécifiée).

Notez que cet exemple de code utilise des états et des minuteurs, au lieu de fenêtres, pour effectuer l'ordonnancement par clé. En effet, il n'est pas garanti que les fenêtres soient traitées dans l'ordre. Cela signifie que les fenêtres plus anciennes peuvent être traitées plus tard que les fenêtres plus récentes, ce qui peut entraîner un traitement dans le désordre.

BreakRecordByModFn

Chaque enregistrement de modification de données peut contenir plusieurs modifications. Chaque modification représente une insertion, une mise à jour ou une suppression d'une seule valeur de clé primaire. Cette fonction divise chaque enregistrement de modification de données en enregistrements de modification de données distincts, un par modification.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Cette fonction reçoit un DataChangeRecord et renvoie un DataChangeRecord associé à la clé primaire Spanner hachée en valeur entière.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

Les minuteurs et les tampons sont définis par clé. Cette fonction met en mémoire tampon chaque enregistrement de modification de données jusqu'à ce que le filigrane dépasse l'horodatage au moment duquel vous souhaitez générer les enregistrements de modification de données mis en mémoire tampon.

Ce code utilise un minuteur en boucle pour déterminer quand vider le tampon:

  1. Lorsqu'il voit un enregistrement de modification de données pour une clé pour la première fois, il définit le minuteur pour qu'il se déclenche au moment du code temporel de validation de l'enregistrement de modification de données + incrementIntervalSeconds (une option configurable par l'utilisateur).
  2. Lorsque le minuteur se déclenche, il ajoute tous les enregistrements de modification de données dans le tampon dont l'horodatage est inférieur au délai d'expiration du minuteur à recordsToOutput. Si le tampon contient des enregistrements de modification de données dont le code temporel est supérieur ou égal à la date d'expiration du minuteur, il les ajoute à nouveau dans le tampon au lieu de les afficher. Il définit ensuite le minuteur suivant sur le délai d'expiration du minuteur actuel plus incrementIntervalInSeconds.
  3. Si recordsToOutput n'est pas vide, la fonction trie les enregistrements de modification des données dans recordsToOutput par code temporel de validation et ID de transaction, puis les affiche.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Trier les transactions

Vous pouvez modifier ce pipeline pour qu'il s'affiche par ordre d'ID de transaction et d'horodatage de validation. Pour ce faire, mettez en mémoire tampon les enregistrements pour chaque paire d'ID de transaction / code temporel de commit, au lieu de le faire pour chaque clé Spanner. Pour ce faire, vous devez modifier le code dans KeyByIdFn.

Exemple: Assembler des transactions

Cet exemple de code lit les enregistrements de modification des données, assemble tous les enregistrements de modification des données appartenant à la même transaction dans un seul élément et renvoie cet élément. Notez que les transactions générées par cet exemple de code ne sont pas triées par code temporel de validation.

Cet exemple de code utilise des tampons pour assembler les transactions à partir des enregistrements de modification des données. Lorsqu'il reçoit pour la première fois un enregistrement de modification de données appartenant à une transaction, il lit le champ numberOfRecordsInTransaction de l'enregistrement de modification de données, qui décrit le nombre d'enregistrements de modification de données attendus pour cette transaction. Il met en mémoire tampon les enregistrements de modification de données appartenant à cette transaction jusqu'à ce que le nombre d'enregistrements mis en mémoire tampon corresponde à numberOfRecordsInTransaction, puis il génère les enregistrements de modification de données groupés.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Cette fonction reçoit un DataChangeRecord et renvoie un DataChangeRecord associé à l'ID de transaction.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn met en mémoire tampon les paires clé-valeur reçues de {TransactionId, DataChangeRecord} à partir de KeyByTransactionIdFn et les met en mémoire tampon par groupes en fonction de TransactionId. Lorsque le nombre d'enregistrements mis en mémoire tampon est égal au nombre d'enregistrements contenus dans l'ensemble de la transaction, cette fonction trie les objets DataChangeRecord du groupe par séquence d'enregistrements et produit une paire clé-valeur de {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.

Ici, nous supposons que SortKey est une classe définie par l'utilisateur qui représente une paire {CommitTimestamp, TransactionId}. Pour en savoir plus sur SortKey, consultez l'exemple d'implémentation.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Exemple: Filtrer par tag de transaction

Lorsqu'une transaction modifiant les données utilisateur est taguée, la balise correspondante et son type sont stockées dans DataChangeRecord. Ces exemples montrent comment filtrer les enregistrements du flux de modifications en fonction de tags de transaction définis par l'utilisateur et de tags système:

Filtrage des tags définis par l'utilisateur pour my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

Filtrage des tags système/Audit de la valeur TTL:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Exemple: Récupérer la ligne complète

Cet exemple fonctionne avec une table Spanner nommée Singer dont la définition est la suivante:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

Dans le mode de capture de valeur OLD_AND_NEW_VALUES par défaut des flux de modifications, lorsqu'une ligne Spanner est mise à jour, l'enregistrement de modification de données reçu ne contient que les colonnes modifiées. Les colonnes suivies, mais non modifiées, ne seront pas incluses dans l'enregistrement. La clé primaire du mod peut être utilisée pour effectuer une lecture d'instantané Spanner à l'horodatage de commit de l'enregistrement de modification de données afin de récupérer les colonnes non modifiées ou même la ligne complète.

Notez que la règle de conservation de la base de données peut devoir être définie sur une valeur supérieure ou égale à la règle de conservation du flux de modifications pour que la lecture de l'instantané aboutisse.

Notez également que l'utilisation du type de capture de valeur NEW_ROW est la méthode recommandée et la plus efficace pour ce faire, car il renvoie par défaut toutes les colonnes suivies de la ligne et ne nécessite pas de lecture d'instantané supplémentaire dans Spanner.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Cette transformation effectue une lecture obsolète au code temporel de validation de chaque enregistrement reçu et mappe la ligne complète en JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Ce code crée un client de base de données Spanner pour effectuer la récupération de la ligne complète et configure le pool de sessions pour qu'il ne comporte que quelques sessions, effectuant des lectures de manière séquentielle dans une instance de ToFullReowJsonFn. Dataflow s'assure de générer de nombreuses instances de cette fonction, chacune avec son propre pool de clients.

Exemple: Spanner vers Pub/Sub

Dans ce scénario, l'appelant diffuse des enregistrements vers Pub/Sub aussi rapidement que possible, sans regroupement ni agrégation. Cette approche est adaptée au déclenchement du traitement en aval, ainsi qu'au streaming de toutes les nouvelles lignes insérées dans une table Spanner vers Pub/Sub pour un traitement ultérieur.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Notez que le récepteur Pub/Sub peut être configuré pour assurer une sémantique de type exactement une fois.

Exemple: Spanner vers Cloud Storage

Dans ce scénario, l'appelant regroupe tous les enregistrements d'une période donnée et les enregistre dans des fichiers Cloud Storage distincts. Cette option convient bien aux analyses et à l'archivage à un moment précis, qui est indépendant de la période de conservation de Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Notez que le sink Cloud Storage fournit une sémantique de type "au moins une fois" par défaut. Avec un traitement supplémentaire, il peut être modifié pour obtenir une sémantique de type "exactement une fois".

Nous fournissons également un modèle Dataflow pour ce cas d'utilisation: consultez Connecter des flux de modifications à Cloud Storage.

Exemple: Spanner vers BigQuery (table de grand livre)

Ici, l'appelant diffuse les enregistrements de modification dans BigQuery. Chaque enregistrement de modification de données est représenté par une ligne dans BigQuery. Cette approche est adaptée aux analyses. Ce code utilise les fonctions définies précédemment, dans la section Récupérer la ligne complète, pour récupérer la ligne complète de l'enregistrement et l'écrire dans BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Notez que le sink BigQuery fournit une sémantique de type "au moins une fois" par défaut. Avec un traitement supplémentaire, il peut être modifié pour obtenir une sémantique de type "exactement une fois".

Nous fournissons également un modèle Dataflow pour ce cas d'utilisation. Consultez Connecter des flux de modifications à BigQuery.

Surveiller un pipeline

Il existe deux classes de métriques disponibles pour surveiller un pipeline Dataflow de flux de modifications.

Métriques Dataflow standards

Dataflow fournit plusieurs métriques pour vous assurer que votre tâche est en bon état, telles que la fraîcheur des données, le retard système, le débit de la tâche, l'utilisation du processeur des nœuds de calcul, etc. Pour en savoir plus, consultez Utiliser Monitoring pour les pipelines Dataflow.

Pour les pipelines de flux de modifications, deux métriques principales doivent être prises en compte: la latence du système et la fraîcheur des données.

La latence système indique la durée maximale actuelle (en secondes) pendant laquelle un élément de données est en cours de traitement ou en attente.

La fraîcheur des données indique le temps écoulé entre le moment présent (temps réel) et la marque de sortie. La marque de sortie de l'heure T indique que tous les éléments avec une heure d'événement (strictement) antérieure à T ont été traités pour le calcul. En d'autres termes, la fraîcheur des données mesure le degré de mise à jour du pipeline en ce qui concerne le traitement des événements qu'il a reçus.

Si le pipeline manque de ressources, vous pouvez voir cet effet dans ces deux métriques. La latence du système augmente, car les éléments doivent attendre plus longtemps avant d'être traités. La fraîcheur des données augmentera également, car le pipeline ne pourra pas suivre la quantité de données reçues.

Métriques personnalisées pour le flux de modifications

Ces métriques sont exposées dans Cloud Monitoring et incluent les suivantes:

  • Latence bucketée (histogramme) entre l'enregistrement d'un enregistrement dans Spanner et son émission dans une PCollection par le connecteur. Cette métrique permet de détecter les problèmes de performances (latence) du pipeline.
  • Nombre total d'enregistrements de données lus. Il s'agit d'une indication globale du nombre d'enregistrements émis par le connecteur. Ce nombre doit augmenter en permanence, reflétant la tendance des écritures dans la base de données Spanner sous-jacente.
  • Nombre de partitions en cours de lecture. Des partitions doivent toujours être lues. Si ce nombre est nul, cela signifie qu'une erreur s'est produite dans le pipeline.
  • Nombre total de requêtes émises lors de l'exécution du connecteur. Il s'agit d'une indication globale des requêtes de flux de modifications envoyées à l'instance Spanner tout au long de l'exécution du pipeline. Vous pouvez vous en servir pour obtenir une estimation de la charge du connecteur vers la base de données Spanner.

Mettre à jour un pipeline existant

Vous pouvez mettre à jour un pipeline en cours d'exécution qui utilise le connecteur SpannerIO pour traiter les flux de modifications si les vérifications de compatibilité des jobs sont réussies. Pour ce faire, vous devez définir explicitement le paramètre de nom de table de métadonnées de la nouvelle tâche lors de sa mise à jour. Utilisez la valeur de l'option de pipeline metadataTable du job que vous mettez à jour.

Si vous utilisez un modèle Dataflow fourni par Google, définissez le nom de la table à l'aide du paramètre spannerMetadataTableName. Vous pouvez également modifier votre tâche existante pour utiliser explicitement la table de métadonnées avec la méthode withMetadataTable(your-metadata-table-name) dans la configuration du connecteur. Une fois cela fait, vous pouvez suivre les instructions de la section Lancer votre tâche de remplacement de la documentation Dataflow pour mettre à jour une tâche en cours d'exécution.

Bonnes pratiques concernant les flux de modification et Dataflow

Vous trouverez ci-dessous quelques bonnes pratiques pour créer des connexions de flux de modifications à l'aide de Dataflow.

Utiliser une base de données de métadonnées distincte

Nous vous recommandons de créer une base de données distincte pour le connecteur SpannerIO à utiliser pour le stockage des métadonnées, plutôt que de le configurer pour qu'il utilise votre base de données d'application.

Pour en savoir plus, consultez la section Envisager une base de données de métadonnées distincte.

Dimensionner votre cluster

En règle générale,le nombre initial de nœuds de calcul dans une tâche de flux de modifications Spanner est d'un nœud de calcul pour 1 000 écritures par seconde. Notez que cette estimation peut varier en fonction de plusieurs facteurs, tels que la taille de chaque transaction, le nombre d'enregistrements de flux de modifications générés à partir d'une seule transaction et d'autres transformations, agrégations ou sinks utilisés dans le pipeline.

Après la réaffectation initiale, il est important de suivre les métriques mentionnées dans Surveiller un pipeline pour vous assurer que le pipeline est en bon état. Nous vous recommandons d'essayer une taille de pool de travailleurs initiale et de surveiller la façon dont votre pipeline gère la charge, en augmentant le nombre de nœuds si nécessaire. L'utilisation du processeur est une métrique clé pour vérifier si la charge est correcte et si d'autres nœuds sont nécessaires.

Limitations connues

Certaines limites sont connues lors de l'utilisation de flux de modifications Spanner avec Dataflow:

Autoscaling

La prise en charge de l'autoscaling pour tous les pipelines qui incluent SpannerIO.readChangeStream nécessite Apache Beam 2.39.0 ou version ultérieure.

Si vous utilisez une version d'Apache Beam antérieure à 2.39.0, les pipelines qui incluent SpannerIO.readChangeStream doivent spécifier explicitement l'algorithme d'autoscaling comme NONE, comme décrit dans la section Autoscaling horizontal.

Pour effectuer le scaling manuel d'un pipeline Dataflow au lieu d'utiliser l'autoscaling, consultez la section Effectuer le scaling manuel d'un pipeline de traitement en flux continu.

Exécuteur V2

Le connecteur de flux de modification Spanner nécessite Dataflow Runner v2. Cette valeur doit être spécifiée manuellement lors de l'exécution, sinon une erreur sera générée. Vous pouvez spécifier Runner V2 en configurant votre tâche avec --experiments=use_unified_worker,use_runner_v2.

Instantané

Le connecteur de flux de modifications Spanner n'est pas compatible avec les instantanés Dataflow.

En cours de drainage

Le connecteur de flux de modifications Spanner n'est pas compatible avec le drainage d'une tâche. Vous ne pouvez annuler qu'une tâche existante.

Vous pouvez également mettre à jour un pipeline existant sans avoir à l'arrêter.

OpenCensus

Pour utiliser OpenCensus pour surveiller votre pipeline, spécifiez la version 0.28.3 ou ultérieure.

NullPointerException au début du pipeline

Un bug dans la version 2.38.0 d'Apache Beam peut provoquer une erreur NullPointerException lors du démarrage du pipeline dans certaines conditions. Cela empêcherait le démarrage de votre tâche et afficherait le message d'erreur suivant:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Pour résoudre ce problème, utilisez la version 2.39.0 d'Apache Beam ou une version ultérieure, ou spécifiez manuellement la version de beam-sdks-java-core comme 2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

En savoir plus