Änderungen mit Dataflow streamen

Mit dem Bigtable Beam-Connector können Sie Bigtable-Datenänderungseinträge mit Dataflow lesen, ohne Partitionsänderungen in Ihrem Code erfassen oder verarbeiten zu müssen. Der Connector übernimmt diese Logik für Sie.

In diesem Dokument wird beschrieben, wie Sie den Bigtable Beam-Connector konfigurieren und verwenden, um einen Änderungsstream mit einer Dataflow-Pipeline zu lesen. Bevor Sie dieses Dokument lesen, sollten Sie sich die Übersicht über Änderungsstreams ansehen und mit Dataflow vertraut sein.

Alternativen zum Erstellen einer eigenen Pipeline

Wenn Sie keine eigene Dataflow-Pipeline erstellen möchten, haben Sie folgende Möglichkeiten:

Sie können eine von Google bereitgestellte Dataflow-Vorlage verwenden.

Sie können auch die Codebeispiele aus der Bigtable-Anleitung oder dem Bigtable-Schnellstart als Ausgangspunkt für Ihren Code verwenden.

Achten Sie darauf, dass für den generierten Code die Version 26.14.0 oder höher von google cloud libraries-bom verwendet wird.

Connector-Details

Mit der Bigtable Beam-Connector-Methode BigtableIO.readChangeStream können Sie einen Stream von Datenänderungssätzen (ChangeStreamMutation) lesen und verarbeiten. Der Bigtable Beam-Connector ist eine Komponente des GitHub-Repositorys für Apache Beam. Eine Beschreibung des Connector-Codes finden Sie in den Kommentaren unter BigtableIO.java.

Sie müssen den Connector mit Beam-Version 2.48.0 oder höher verwenden. Sehen Sie unter Unterstützung von Apache Beam-Laufzeiten nach, ob Sie eine unterstützte Java-Version verwenden. Anschließend können Sie eine Pipeline bereitstellen, die den Connector für Dataflow verwendet. Dieser übernimmt die Bereitstellung und Verwaltung von Ressourcen und unterstützt die Skalierbarkeit und Zuverlässigkeit der Verarbeitung von Streaming-Daten.

Weitere Informationen zum Apache Beam-Programmiermodell finden Sie in der Beam-Dokumentation.

Daten ohne Ereigniszeiten gruppieren

Datenänderungseinträge, die mit dem Bigtable Beam-Connector gestreamt werden, sind nicht mit Dataflow-Funktionen kompatibel, die von Ereigniszeiten abhängen.

Wie unter Replikation und Markierungen erläutert, wird ein niedriger Wasserzeichenwert möglicherweise nicht erhöht, wenn die Replikation für die Partition noch nicht auf dem Stand des Rests der Instanz ist. Wenn ein niedriger Wasserzeichenwert nicht mehr fortschreitet, kann der Änderungsstream stecken bleiben.

Um ein Stottern des Streams zu verhindern, gibt der Bigtable Beam-Connector alle Daten mit einem Ausgabezeitstempel von null aus. Aufgrund des Zeitstempels „0“ werden alle Datenänderungseinträge in Dataflow als verzögerte Daten betrachtet. Daher sind Dataflow-Funktionen, die von Ereigniszeiten abhängen, nicht mit Bigtable-Änderungsstreams kompatibel. Insbesondere können Sie keine Fensterfunktionen, Ereigniszeit-Trigger oder Ereigniszeit-Timer verwenden.

Stattdessen können Sie GlobalWindows mit Triggern verwenden, die nicht auf der Ereigniszeit basieren, um diese verspäteten Daten in Ansichten zu gruppieren, wie im Beispiel aus der Anleitung gezeigt. Weitere Informationen zu Triggern und Steuerfeldbereichen finden Sie im Beam-Programmieranleitung unter Trigger.

Autoscaling

Der Connector unterstützt das Dataflow-Autoscaling, das standardmäßig aktiviert ist, wenn Runner v2 verwendet wird (erforderlich). Der Dataflow-Autoscaling-Algorithmus berücksichtigt den geschätzten Rückstand des Änderungsstreams. Dieser kann auf der Seite Dataflow-Überwachung im Bereich Backlog beobachtet werden. Verwenden Sie das Flag --maxNumWorkers, wenn Sie einen Job bereitstellen, um die Anzahl der Worker zu begrenzen.

Informationen zum manuellen Skalieren der Pipeline anstelle des Autoscalings finden Sie unter Streamingpipeline manuell skalieren.

Beschränkungen

Beachten Sie die folgenden Einschränkungen, bevor Sie den Bigtable Beam-Connector mit Dataflow verwenden.

Dataflow Runner v2

Der Connector kann nur mit Dataflow Runner v2 ausgeführt werden. Geben Sie dazu --experiments=use_runner_v2 in Ihren Befehlszeilenargumenten an. Wenn Sie die Pipeline mit Runner v1 ausführen, schlägt sie mit der folgenden Ausnahme fehl:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Snapshots

Der Connector unterstützt keine Dataflow-Snapshots.

Duplikate

Der Bigtable Beam-Connector streamt Änderungen für jeden Zeilenschlüssel und jeden Cluster in der Reihenfolge des Commit-Zeitstempels. Da er jedoch manchmal von früheren Zeitpunkten im Stream aus startet, können Duplikate entstehen.

Hinweise

Bevor Sie den Connector verwenden können, müssen Sie die folgenden Voraussetzungen erfüllen.

Authentifizierung einrichten

Wenn Sie die Java Beispiele auf dieser Seite in einer lokalen Entwicklungsumgebung verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

Weitere Informationen unter Set up authentication for a local development environment.

Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Set up Application Default Credentials for code running on Google Cloud.

Änderungsstream aktivieren

Sie müssen einen Änderungsstream für eine Tabelle aktivieren, bevor Sie sie lesen können. Sie können auch eine neue Tabelle erstellen, für die Änderungsstreams aktiviert sind.

Tabelle mit Stream-Metadaten ändern

Wenn Sie Änderungen mit Dataflow streamen, erstellt der Bigtable Beam-Connector eine Metadatentabelle, die standardmäßig den Namen __change_stream_md_table hat. In der Metadatentabelle des Änderungsstreams wird der Betriebsstatus des Connectors verwaltet und Metadaten zu Datenänderungseinträgen gespeichert.

Standardmäßig erstellt der Connector die Tabelle in derselben Instanz wie die gestreamte Tabelle. Damit die Tabelle richtig funktioniert, muss das Anwendungsprofil für die Metadatentabelle Single-Cluster-Routing verwenden und Transaktionen für einzelne Zeilen aktiviert haben.

Weitere Informationen zum Streamen von Änderungen aus Bigtable mit dem Bigtable Beam-Connector finden Sie in der BigtableIO-Dokumentation.

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Lesen eines Bigtable-Änderungsstreams mit Dataflow benötigen.

Um die Änderungen aus Bigtable zu lesen, benötigen Sie diese Rolle:

  • Bigtable-Administrator (roles/bigtable.admin) für die Bigtable-Instanz, die die Tabelle enthält, aus der Sie Änderungen streamen möchten

Zum Ausführen des Dataflow-Jobs benötigen Sie die folgenden Rollen:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Bigtable Beam-Connector als Abhängigkeit hinzufügen

Fügen Sie Ihrer Maven-pom.xml-Datei Code ähnlich der folgenden Abhängigkeit hinzu. Die Version muss 2.48.0 oder höher sein.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Änderungsstream lesen

Wenn Sie eine Dataflow-Pipeline zum Lesen Ihrer Datenänderungseinträge erstellen möchten, konfigurieren Sie den Connector und fügen dann Transformationen und Senken hinzu. Anschließend verwenden Sie den Connector, um ChangeStreamMutation-Objekte in einer Beam-Pipeline zu lesen.

Die in Java geschriebenen Codebeispiele in diesem Abschnitt zeigen, wie Sie eine Pipeline erstellen und damit Schlüssel/Wert-Paare in einen String konvertieren. Jedes Paar besteht aus einem Zeilenschlüssel und einem ChangeStreamMutation-Objekt. Die Pipeline wandelt die Einträge jedes Objekts in einen durch Kommas getrennten String um.

Pipeline erstellen

Dieses Java-Codebeispiel zeigt, wie die Pipeline erstellt wird:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Datenänderungseinträge verarbeiten

In diesem Beispiel wird veranschaulicht, wie Sie alle Einträge in einem Datenänderungseintrag für eine Zeile durchlaufen und je nach Eintragstyp eine Methode zum Umwandeln in eine Zeichenfolge aufrufen.

Eine Liste der Eintragstypen, die ein Datenänderungseintrag enthalten kann, finden Sie unter Inhalt eines Datenänderungseintrags.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

In diesem Beispiel wird ein write-Eintrag konvertiert:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

In diesem Beispiel wird ein Eintrag vom Typ Zellen löschen konvertiert:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

In diesem Beispiel wird ein Eintrag zum Löschen einer Spaltenfamilie konvertiert:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Überwachen

Mit den folgenden Ressourcen in der Google Cloud Console können Sie Ihre Google Cloud-Ressourcen überwachen, während Sie eine Dataflow-Pipeline ausführen, um einen Bigtable-Änderungsstream zu lesen:

Achten Sie insbesondere auf die folgenden Messwerte:

  • Prüfen Sie auf der Seite Bigtable-Monitoring die folgenden Messwerte:
    • Daten zur CPU-Auslastung nach Änderungsstreams im Messwert cpu_load_by_app_profile_by_method_by_table Die Auswirkungen des Änderungsstreams auf die CPU-Auslastung Ihres Clusters.
    • Änderungsstream-Speicherauslastung (Byte) (change_stream_log_used_bytes).
  • Aktivieren Sie auf der Seite „Dataflow-Monitoring“ die Option Datenaktualität. Hier sehen Sie den Unterschied zwischen der aktuellen Uhrzeit und dem Wasserzeichen. Sie sollte etwa zwei Minuten dauern, mit gelegentlichen Spitzen, die ein bis zwei Minuten länger sind. Wenn der Messwert für die Datenaktualität über diesem Grenzwert liegt, sind Ihre Ressourcen für die Pipeline wahrscheinlich zu gering. Sie sollten dann mehr Dataflow-Worker hinzufügen. Die Datenaktualität gibt nicht an, ob Datenänderungseinträge langsam verarbeitet werden.
  • Mit dem Dataflow-Messwert processing_delay_from_commit_timestamp_MEAN können Sie die durchschnittliche Verarbeitungszeit von Datenänderungseinträgen während der Lebensdauer des Jobs ermitteln.

Der Bigtable-Messwert server/latencies ist nicht nützlich, wenn Sie eine Dataflow-Pipeline überwachen, die einen Bigtable-Änderungsstream liest, da er die Dauer der Streaminganfrage widerspiegelt, nicht die Latenz bei der Verarbeitung von Datenänderungseinträgen. Eine hohe Latenz in einem Änderungsstream bedeutet nicht, dass die Anfragen langsam verarbeitet werden, sondern dass die Verbindung so lange offen war.

Nächste Schritte