Créer des connexions de flux de modifications avec Dataflow

Ce document explique comment créer des pipelines Dataflow qui intègrent et transfèrent les données de modification Cloud Spanner via des flux de modification. L'exemple de code de ce document peut être utilisé pour créer des pipelines personnalisés.

Concepts fondamentaux

Dataflow

Dataflow est un service sans serveur, rapide et économique, compatible avec le traitement par flux et par lot. Il permet la portabilité des tâches de traitement écrites à l'aide des bibliothèques Open Source Apache Beam, et automatise le provisionnement de l'infrastructure et la gestion des clusters.

Vous pouvez utiliser Dataflow pour utiliser des flux de modifications avec Spanner à l'aide du connecteur Spanner, qui permet d'interroger les flux de modification via l'API Spanner. Grâce à ce connecteur, vous n'avez pas besoin de gérer le cycle de vie de la partition des flux de modifications, ce qui est nécessaire lorsque vous utilisez directement l'API Spanner. Le connecteur fournit un flux d'enregistrements de modifications des données afin que vous puissiez vous concentrer sur la logique d'application, et moins sur des détails d'API spécifiques et sur le partitionnement de flux de modification dynamique. Nous vous recommandons d'utiliser le connecteur SpannerIO plutôt que l'API Spanner dans la plupart des cas où vous devez lire les données de flux de modification.

Les modèles Dataflow sont des pipelines Dataflow prédéfinis faciles à utiliser qui implémentent des cas d'utilisation courants. Pour en savoir plus, consultez Modèles Dataflow.

Pipeline Dataflow

Un pipeline Dataflow de flux de modifications se compose de quatre parties principales:

  1. Une base de données Spanner avec un flux de modifications
  2. Connecteur SpannerIO
  3. Transformations et récepteurs définis par l'utilisateur
  4. Rédacteur d'E/S de récepteur

image

Chacune de ces fonctionnalités est décrite en détail ci-dessous.

Flux de modification de Spanner

Pour savoir comment créer un flux de modifications, consultez Créer un flux de modifications.

Connecteur Apache Beam Spanner

Il s'agit du connecteur SpannerIO décrit plus tôt. Il s'agit d'un connecteur d'E/S source qui émet une collection d'enregistrements de modifications de données aux dernières étapes du pipeline. L'heure de l'événement pour chaque enregistrement de modification de données émises correspond à l'horodatage de commit. Notez que les enregistrements émis sont non classés et que le connecteur SpannerIO garantit qu'il n'y aura pas d'enregistrements tardifs.

Transformations définies par l'utilisateur

Une transformation définie par l'utilisateur permet à un utilisateur d'agréger, de transformer ou de modifier des données de traitement dans un pipeline Dataflow. Cas d'utilisation courants : la suppression des informations personnelles, le respect des exigences en termes de format de données en aval et le tri. Consultez la documentation officielle d'Apache Beam pour le guide de programmation sur les transformations.

Rédacteur d'E/S du récepteur Apache Beam

Apache Beam contient des transformations d'E/S intégrées qui peuvent être utilisées pour écrire des données à partir d'un pipeline Dataflow dans un récepteur de données tel que BigQuery. La plupart des récepteurs de données courants sont compatibles de façon native.

Modèles Dataflow

Les modèles Dataflow vous permettent de créer facilement des tâches Dataflow à partir d'images Docker prédéfinies pour des cas d'utilisation courants via Google Cloud Console, la CLI Google Cloud ou les appels d'API REST.

Pour les flux de modifications Spanner, nous proposons deux modèles flexibles Dataflow:

Créer un pipeline Dataflow

Cette section couvre la configuration initiale du connecteur et fournit des exemples d'intégrations courantes avec la fonctionnalité de flux de modifications de Spanner.

Pour suivre ces étapes, vous avez besoin d'un environnement de développement Java pour Dataflow. Pour en savoir plus, consultez la section Créer un pipeline Dataflow à l'aide de Java.

Créer un flux de modifications

Pour en savoir plus sur la création d'un flux de modifications, consultez Créer un flux de modifications. Pour passer aux étapes suivantes, vous devez disposer d'une instance Spanner avec un flux de modifications configuré.

Ajouter le connecteur SpannerIO en tant que dépendance

Le connecteur Apache Beam SpannerIO encapsule la complexité de l'intégration des flux de modification directement via l'API Cloud Spanner, en émettant une PCollection d'enregistrements de données de flux de modifications aux étapes suivantes du pipeline.

Ces objets peuvent être utilisés à d'autres étapes du pipeline Dataflow de l'utilisateur. L'intégration du flux de modification fait partie du connecteur Spanner. Pour utiliser le connecteur SpannerIO, vous devez ajouter la dépendance à votre fichier pom.xml:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

Créer une base de données de métadonnées

Le connecteur doit effectuer le suivi de chaque partition lors de l'exécution du pipeline Apache Beam. Il conserve ces métadonnées dans une table Spanner créée par le connecteur lors de l'initialisation. Spécifiez la base de données dans laquelle cette table sera créée lors de la configuration du connecteur.

Comme décrit dans les bonnes pratiques, nous vous recommandons de créer une base de données à cette fin plutôt que d'autoriser le connecteur à utiliser la base de données de votre application pour stocker sa table de métadonnées.

Le propriétaire d'une tâche Dataflow qui utilise le connecteur SpannerIO doit définir les autorisations IAM suivantes avec cette base de données de métadonnées:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Configurer le connecteur

Le connecteur de flux de modifications Spanner peut être configuré comme suit:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id");

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   now.getSeconds() + (10 * 60),
   now.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime)

Dans l'exemple de configuration ci-dessus, cela vaut la peine de désenvelopper chaque option.

Configuration de Spanner (obligatoire)

Permet de configurer le projet, l'instance et la base de données où le flux de modifications a été créé et pour lequel il doit être interrogé.

Modifier le nom du flux (obligatoire)

Ce nom identifie de manière unique le flux de modifications. Le nom doit être identique à celui utilisé lors de sa création.

ID de l'instance de métadonnées (facultatif)

Il s'agit de l'instance où stocker les métadonnées utilisées par le connecteur pour contrôler la consommation des données de l'API Stream Change.

ID de la base de données des métadonnées (obligatoire)

Cette base de données contient les métadonnées utilisées par le connecteur pour contrôler la consommation des données de l'API du flux de modification.

Nom de la table de métadonnées (facultatif)

Utilisez-la uniquement lors de la mise à jour d'un pipeline existant.

Nom de la table de métadonnées préexistante à utiliser par le connecteur. Il permet au connecteur de stocker les métadonnées pour contrôler la consommation des données de l'API du flux de modification. Si cette option est omise, Spanner crée une table avec un nom généré lors de l'initialisation du connecteur.

Priorité RPC (facultatif)

Priorité de requête à utiliser pour les requêtes de flux de modifications. Si ce paramètre est omis, le paramètre high priority sera utilisé.

InclusifStartAt (obligatoire)

Les modifications apportées à l'horodatage donné sont renvoyées à l'appelant.

InclusifEndAt (facultatif)

Les modifications apportées à l'horodatage spécifié sont renvoyées à l'appelant. Si ce paramètre est omis, les modifications seront émises indéfiniment.

Ajouter des transformations et des récepteurs pour traiter des données sur les changements

Une fois les étapes précédentes terminées, le connecteur SpannerIO configuré est prêt à émettre une collection PCollection d'objets DataChangeRecord. Consultez l'article Exemples de transformations et de récepteurs pour plusieurs exemples de configuration de pipeline traitant ces données en streaming de différentes manières.

Notez que les enregistrements de flux émis par le connecteur SpannerIO ne sont pas classés. En effet, les PCollections ne fournissent aucune garantie de tri. Si vous avez besoin d'un flux ordonné, vous devez regrouper et trier les enregistrements en tant que transformations dans vos pipelines. Pour ce faire, consultez Exemple : Trier par clé. Vous pouvez étendre cet exemple pour trier les enregistrements en fonction de tous les champs, par exemple, les ID de transaction.

Exemples de transformations et de récepteurs

Vous pouvez définir vos propres transformations et spécifier les récepteurs dans lesquels écrire les données. La documentation Apache Beam fournit de nombreuses transformations qui peuvent être appliquées, ainsi que des connecteurs d'E/S prêts à écrire les données dans les systèmes externes.

Exemple: Trier par clé

Cet exemple de code émet des enregistrements de modification des données classés par horodatage de commit et regroupés par clés primaires à l'aide du connecteur Dataflow.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id"))
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

Cet exemple de code utilise les états et les minuteurs pour mettre en mémoire tampon les enregistrements de chaque clé et définit l'heure d'expiration du minuteur sur l'heure T configurée par l'utilisateur (définie dans la fonction BufferKeyUntilOutputTimestamp). Lorsque le filigrane Dataflow dépasse T, ce code supprime tous les enregistrements du tampon avec un horodatage inférieur à T, trie ces enregistrements par horodatage de commit et génère une paire clé-valeur:

  • La clé est la clé d'entrée, c'est-à-dire la clé primaire hachée dans un tableau de buckets de taille 1 000.
  • La valeur correspond aux enregistrements de modification des données qui ont été mis en mémoire tampon pour la clé.

Pour chaque clé, nous pouvons garantir les avantages suivants:

  • Les minuteurs peuvent se déclencher selon l'horodatage d'expiration.
  • Les étapes en aval reçoivent les éléments dans l'ordre dans lequel elles ont été produites.

Par exemple, imaginons que pour une clé avec la valeur 100, le minuteur se déclenche respectivement à T1 et T10, générant ainsi un groupe d'enregistrements de modifications à chaque horodatage. Étant donné que les enregistrements de modification de données générés à T1 ont été produits avant l'enregistrement des enregistrements de modification de données à T10, les enregistrements de modification de données renvoyés à T1 seront forcément reçus à l'étape suivante avant que les enregistrements de la mise à jour de données ne soient renvoyés à T10. Ce mécanisme nous permet de garantir un ordre d'horodatage de commit strict par clé primaire pour le traitement en aval.

Ce processus se répète jusqu'à la fin du pipeline et de l'ensemble des enregistrements de modification des données (ou se répète indéfiniment si aucune heure de fin n'est spécifiée).

Notez que cet exemple de code utilise des états et des minuteurs au lieu de fenêtres pour effectuer un tri par clé. Le motif est que le traitement des fenêtres n'est pas garanti. En d'autres termes, les anciennes fenêtres peuvent être traitées plus tard que les périodes plus récentes, ce qui peut entraîner un traitement dans l'ordre.

BreakRecordByModFn

Chaque enregistrement de modification de données peut contenir plusieurs mods. Chaque mod représente une insertion, une mise à jour ou une suppression d'une seule clé primaire. Cette fonction divise chaque enregistrement de modification de données en enregistrements de modification de données distincts, un par mod.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

CléIdIdFn

Cette fonction utilise un DataChangeRecord et génère un DataChangeRecord selon la clé primaire Spanner hachée en valeur entière.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

Les minuteurs et les tampons sont par clé. Cette fonction met en mémoire tampon l'enregistrement de modification de données jusqu'à ce que le filigrane transmette l'horodatage auquel nous souhaitons générer les enregistrements.

Ce code utilise un minuteur en boucle pour déterminer quand vider le tampon:

  1. Lorsqu'un enregistrement de modifications de données est détecté pour une première fois, il définit le minuteur pour qu'il se déclenche lors de l'enregistrement de modifications associé à l'enregistrement de modifications en fonction de l'horodatage de commit + incrementIntervalSeconds (option configurable par l'utilisateur).
  2. Lorsque le minuteur se déclenche, il ajoute tous les enregistrements de modification de données dans le tampon avec un horodatage inférieur à l'heure d'expiration du minuteur (recordsToOutput). Si le tampon possède des enregistrements de modification de données dont l'horodatage est supérieur ou égal à l'heure d'expiration du minuteur, il ajoute ces enregistrements de modification dans le tampon au lieu de les afficher. Elle définit ensuite le minuteur suivant sur l'heure d'expiration du minuteur actuel, suivie de incrementIntervalInSeconds.
  3. Si recordsToOutput n'est pas vide, la fonction classe les enregistrements de modifications dans recordsToOutput par horodatage de validation et par ID de transaction, puis les génère.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (keyString == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Commander des transactions

Vous pouvez modifier ce pipeline selon l'ID de transaction et l'horodatage de commit. Pour ce faire, mettez en mémoire tampon les enregistrements pour chaque paire ID / transaction de code temporel au lieu de chaque clé Spanner. Cette opération nécessite la modification du code dans KeyByIdFn.

Exemple: assembler les transactions

Cet exemple de code lit les enregistrements de modification de données, assemble tous les enregistrements de modification appartenant à la même transaction dans un seul élément et génère cet élément. Notez que les transactions générées par cet exemple de code ne sont pas triées par horodatage de commit.

Cet exemple de code utilise des tampons pour assembler les transactions issues des enregistrements de modifications des données. Lorsque vous recevez pour la première fois un enregistrement de modification de données appartenant à une transaction, il lit le champ numberOfRecordsInTransaction de l'enregistrement de modification des données, qui décrit le nombre attendu d'enregistrements de modifications appartenant à cette transaction. Il met en mémoire tampon les enregistrements de modification de données appartenant à cette transaction jusqu'à ce que le nombre d'enregistrements en mémoire tampon corresponde à numberOfRecordsInTransaction, sur lequel il génère les enregistrements de modification de données groupées.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id"))
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

Clé_TransactionIdFn

Cette fonction prend une DataChangeRecord et génère un DataChangeRecord associé à l'ID de transaction.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionLimiteFn

TransactionBoundaryFn a mis en mémoire tampon des paires clé/valeur {TransactionId, DataChangeRecord} de KeyByTransactionIdFn et les a tamponnées dans des groupes basés sur TransactionId. Lorsque le nombre d'enregistrements mis en mémoire tampon est égal au nombre d'enregistrements contenus dans la transaction entière, cette fonction trie les objets DataChangeRecord du groupe par séquence d'enregistrements et génère une paire clé/valeur {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.

Ici, nous supposons que SortKey est une classe définie par l'utilisateur qui représente une paire {CommitTimestamp, TransactionId}. Consultez l'exemple de mise en œuvre pour SortKey.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Exemple: Récupérer la ligne complète

Cet exemple fonctionne avec une table Spanner nommée Singer qui a la définition suivante:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

Sous le mode de capture de valeur OLD_AND_NEW_VALUES par défaut des flux de modifications, lorsqu'une ligne Spanner est mise à jour, l'enregistrement de modification des données reçu ne contient que les colonnes modifiées. Les colonnes suivies, mais inchangées, ne seront pas incluses dans l'enregistrement. La clé primaire du mod peut servir à effectuer un instantané Spanner en lecture de l'horodatage de commit de l'enregistrement de modification des données pour récupérer les colonnes inchangées, ou même récupérer la ligne complète.

Notez que la règle de conservation de la base de données peut avoir besoin d'être remplacée par une valeur supérieure ou égale à la règle de conservation du flux de modification pour que l'instantané puisse être lu.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id");

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

RangéeDeAccès direct

Cette transformation effectuera une lecture obsolète au moment de l'horodatage de commit de chaque enregistrement reçu et mappe la ligne complète au format JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Ce code crée un client de base de données Spanner pour effectuer la récupération complète des lignes et configure le pool de sessions pour qu'il comporte seulement quelques sessions. Il effectue des lectures dans une instance du ToFullRowJsonFn. Dataflow veille à générer de nombreuses instances de cette fonction, chacune avec son propre pool de clients.

Exemple: Spanner vers Pub/Sub

Dans ce scénario, l'appelant diffuse les enregistrements sur Pub/Sub le plus rapidement possible, sans regroupement ni agrégation. Cela convient pour déclencher un traitement en aval, par exemple pour diffuser en flux continu toutes les nouvelles lignes insérées dans une table Spanner vers Pub/Sub.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id"))
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Notez que le récepteur Pub/Sub peut être configuré pour garantir la sémantique exactement une fois.

Exemple: Spanner vers Cloud Storage

Dans ce scénario, l'appelant regroupe tous les enregistrements d'une fenêtre donnée et les enregistre dans des fichiers Cloud Storage distincts. Cette option convient à l'analyse et à l'archivage à un moment précis, indépendamment de la durée de conservation de Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id"))
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Notez que le récepteur Cloud Storage fournit une sémantique au moins une fois par défaut. Grâce à un traitement supplémentaire, il peut être modifié pour utiliser une sémantique "une seule fois".

Nous fournissons également un modèle Dataflow pour ce cas d'utilisation: voir Connecter les flux de modification à Cloud Storage.

Exemple: Spanner vers BigQuery (table léger)

Ici, le flux de l'appelant modifie les enregistrements dans BigQuery. Chaque enregistrement de modification de données est représenté par une ligne dans BigQuery. Cette option est adaptée aux analyses. Ce code utilise les fonctions définies précédemment, dans la section Fetch full row (Récupérer la ligne complète), pour récupérer la ligne complète de l'enregistrement et l'écrire dans BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id");

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Notez que le récepteur BigQuery fournit une sémantique au moins une fois par défaut. Grâce à un traitement supplémentaire, il peut être modifié pour utiliser une sémantique "une seule fois".

Nous fournissons également un modèle Dataflow pour ce cas d'utilisation. Consultez Connecter des flux de modification à BigQuery.

Surveiller un pipeline

Deux classes de métriques sont disponibles pour surveiller un pipeline Dataflow de flux de modification.

Métriques Dataflow standards

Dataflow fournit plusieurs métriques pour assurer la qualité de votre tâche, comme la fraîcheur des données, le retard du système, le débit des tâches, l'utilisation du processeur par le nœud de calcul, etc. Pour plus d'informations, consultez Utiliser Monitoring pour les pipelines Dataflow.

Pour les pipelines de flux de modifications, vous devez tenir compte de deux métriques principales: la latence du système et la fraîcheur des données.

La latence du système indique la durée maximale actuelle (en secondes) pendant laquelle un élément de données est traité ou en attente de traitement.

La fraîcheur des données indique le temps écoulé entre le flux en temps réel et le filigrane de sortie. Le filigrane de sortie pour la date T indique que tous les éléments associés à une heure d'événement (strictement) ont été traités avant le calcul de T. En d'autres termes, la fraîcheur des données mesure la fréquence de mise à jour du pipeline.

Si le pipeline est sous ressources, vous pouvez constater cet effet dans ces deux métriques. La latence du système augmente, car les éléments doivent attendre plus longtemps avant d'être traités. La fraîcheur des données va également augmenter, car le pipeline ne sera plus en mesure de suivre la quantité de données reçues.

Métriques de flux de modifications personnalisées

Ces métriques sont exposées dans Cloud Monitoring et incluent:

  • Latence cumulée (histogramme) entre un enregistrement en cours d'exécution dans Spanner et son émission dans une collection PCollection par le connecteur. Vous pouvez utiliser cette métrique pour identifier les problèmes de performances (latence) du pipeline.
  • Nombre total d'enregistrements de données lus. Il s'agit d'une indication générale du nombre d'enregistrements émis par le connecteur. Ce nombre ne cesse de croître, car il reflète la tendance des écritures dans la base de données Spanner sous-jacente.
  • Nombre de partitions en cours de lecture. Des partitions doivent toujours être lues. Si ce nombre est égal à zéro, cela indique qu'une erreur s'est produite dans le pipeline.
  • Nombre total de requêtes émises lors de l'exécution du connecteur Il s'agit d'une indication globale des requêtes de flux de modifications apportées à l'instance Spanner pendant l'exécution du pipeline. Permet d'obtenir une estimation de la charge du connecteur vers la base de données Spanner.

Mettre à jour un pipeline existant

Il est possible de mettre à jour un pipeline en cours d'exécution qui utilise le connecteur SpannerIO pour traiter les flux de modification si les vérifications de compatibilité des tâches réussissent. Pour ce faire, vous devez définir explicitement le paramètre de nom de table de métadonnées de la nouvelle tâche lors de la mise à jour. Utilisez la valeur de l'option de pipeline metadataTable pour la tâche que vous mettez à jour.

Vous pouvez modifier votre tâche existante pour utiliser explicitement la table de métadonnées à l'aide de la méthode withMetadataTable(your-metadata-table-name) dans la configuration du connecteur. Ensuite, vous pouvez suivre les instructions de la section Lancer votre tâche de remplacement à partir de la documentation Dataflow pour mettre à jour une tâche en cours d'exécution.

Bonnes pratiques

Toujours utiliser une base de données de métadonnées

Nous vous recommandons de créer une base de données distincte pour le connecteur SpannerIO pour le stockage de métadonnées, plutôt que de la configurer pour utiliser votre base de données d'application.

Le connecteur de flux de modifications Spanner nécessite des autorisations de lecture/écriture sur la base de données de métadonnées. Vous n'avez pas besoin de préparer cette base de données avec un schéma. Le connecteur s'en charge.

L'utilisation d'une base de données de métadonnées distincte élimine les complexités liées au problème d'écriture du connecteur directement dans la base de données de votre application:

  • En séparant la base de données de métadonnées de la base de données de production et le flux de modifications, le connecteur n'a besoin que d'autorisations en lecture sur la base de données de production.

  • En limitant le trafic du connecteur vers une base de données de métadonnées distincte, les écritures effectuées par le connecteur lui-même ne seront pas incluses dans les flux de modification de production. Cela est particulièrement pertinent pour les flux de modification qui surveillent l'intégralité de la base de données.

Si aucune base de données distincte n'est utilisée pour stocker les métadonnées, nous vous recommandons de surveiller l'impact du processeur du connecteur de flux de modifications sur leurs instances.

Taille de votre cluster

En règle générale,il est nécessaire d'utiliser un nombre initial de nœuds de calcul dans une tâche de flux de modification Spanner pour un nœud de calcul pour 1 000 opérations d'écriture par seconde dans Spanner. Notez que cette estimation peut varier en fonction de plusieurs facteurs, comme la taille de chaque transaction, le nombre d'enregistrements de flux de modifications générés à partir d'une seule transaction, ainsi que d'autres transformations, agrégations ou récepteurs utilisés dans le pipeline.

Après le provisionnement initial, il est important de suivre les métriques mentionnées dans la section Surveiller un pipeline pour vous assurer qu'il est opérationnel. Nous vous recommandons de tester une taille initiale de pool de nœuds de calcul et de surveiller la manière dont votre pipeline gère la charge, en augmentant le nombre de nœuds si nécessaire. L'utilisation du processeur est une métrique clé qui permet de vérifier si la charge est correcte et qu'il faut ajouter plus de nœuds.

Limitations connues

Autoscaling

L'autoscaling est compatible avec tous les pipelines incluant SpannerIO.readChangeStream, qui nécessite Apache Beam 2.39.0 ou une version ultérieure.

Si vous utilisez une version d'Apache Beam antérieure à 2.39.0, les pipelines qui incluent SpannerIO.readChangeStream doivent spécifier explicitement l'algorithme d'autoscaling en tant que NONE, comme décrit dans la section Autoscaling horizontal.

Pour effectuer un scaling manuel d'un pipeline Dataflow au lieu d'utiliser l'autoscaling, consultez la section Effectuer un scaling manuel d'un pipeline de traitement par flux.

Runner V2

Le connecteur de flux de modifications Spanner nécessite Dataflow Runner V2. Vous devez les spécifier manuellement lors de l'exécution, sans quoi une erreur sera générée. Vous pouvez spécifier Runner V2 en configurant votre tâche avec --experiments=use_unified_worker,use_runner_v2.

Instantané

Le connecteur de flux de modifications Spanner n'est pas compatible avec les instantanés Dataflow.

En cours de drainage

Le connecteur de flux de modifications Spanner ne permet pas de drainer une tâche. Vous pouvez uniquement annuler une tâche existante.

Vous pouvez également mettre à jour un pipeline existant sans avoir à l'arrêter.

OpenCensus

Pour surveiller votre pipeline avec OpenCensus, spécifiez la version 0.28.3 ou une version ultérieure.

NullPointerException au démarrage du pipeline

Un bug dans la version 2.38.0 d'Apache Beam peut entraîner un NullPointerException lors du démarrage du pipeline dans certaines conditions. Cette action empêche votre tâche de démarrer et affiche ce message d'erreur à la place:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Pour résoudre ce problème, utilisez Apache Beam version 2.39.0 ou ultérieure, ou spécifiez manuellement la version de beam-sdks-java-core avec la valeur 2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>