Änderungen mit Dataflow streamen

Mit dem Bigtable Beam-Connector können Sie mit Dataflow Bigtable-Datenänderungseinträge lesen, ohne Partitionsänderungen in Ihrem Code verfolgen oder verarbeiten zu müssen, da der Connector diese Logik für Sie übernimmt.

In diesem Dokument wird beschrieben, wie Sie den Bigtable Beam-Connector konfigurieren und verwenden, um einen Änderungsstream mit einer Dataflow-Pipeline zu lesen. Bevor Sie dieses Dokument lesen, sollten Sie die Übersicht über Änderungsstreams gelesen und mit Dataflow vertraut sein.

Alternativen zum Erstellen einer eigenen Pipeline

Wenn Sie keine eigene Dataflow-Pipeline erstellen möchten, können Sie eine der folgenden Optionen verwenden.

Sie können eine von Google bereitgestellte Dataflow-Vorlage verwenden.

Sie können auch die Codebeispiele aus der Bigtable-Anleitung oder der Bigtable-Kurzanleitung als Ausgangspunkt für Ihren Code verwenden.

Achten Sie darauf, dass der von Ihnen generierte Code google cloud libraries-bom-Version 26.14.0 oder höher verwendet.

Connector-Details

Mit der Bigtable Beam-Connector-Methode BigtableIO.readChangeStream können Sie einen Stream von Datensätzen für Datenänderungen (ChangeStreamMutation) lesen, den Sie verarbeiten können. Der Bigtable Beam-Connector ist eine Komponente des GitHub-Repositorys für Apache Beam. Eine Beschreibung des Connector-Codes finden Sie in den Kommentaren unter BigtableIO.java.

Sie müssen den Connector mit Beam-Version 2.48.0 oder höher verwenden. Prüfen Sie in der Apache Beam-Laufzeitunterstützung, ob Sie eine unterstützte Java-Version verwenden. Anschließend können Sie eine Pipeline bereitstellen, die den Connector zu Dataflow verwendet. Dataflow übernimmt die Bereitstellung und Verwaltung von Ressourcen und unterstützt die Skalierbarkeit und Zuverlässigkeit der Streamdatenverarbeitung.

Weitere Informationen zum Apache Beam-Programmiermodell finden Sie in der Beam-Dokumentation.

Daten ohne Ereigniszeiten gruppieren

Mit dem Bigtable Beam-Connector gestreamte Datenänderungseinträge sind nicht mit Dataflow-Funktionen kompatibel, die von Ereigniszeiten abhängen.

Wie unter Replikation und Wasserzeichen erläutert, wird ein niedriges Wasserzeichen möglicherweise nicht fortgeführt, wenn die Replikation für die Partition nicht den Rest der Instanz erreicht hat. Wenn ein niedriges Watermark aufhört, kann dies dazu führen, dass der Änderungsstream ins Stocken gerät.

Um zu verhindern, dass der Stream angehalten wird, gibt der Bigtable Beam-Connector alle Daten mit einem Ausgabezeitstempel von null aus. Durch den Null-Zeitstempel betrachtet Dataflow alle Datensätze für Änderungen als verspätete Daten. Daher sind Dataflow-Features, die von Ereigniszeiten abhängen, nicht mit Bigtable-Änderungsstreams kompatibel. Insbesondere können Sie keine Windowing-Funktionen, Ereigniszeit-Trigger oder Ereigniszeit-Timer verwenden.

Stattdessen können Sie GlobalWindows mit Triggern ohne Ereigniszeit verwenden, um diese verspäteten Daten in Bereichen zu gruppieren, wie im Beispiel aus der Anleitung gezeigt. Weitere Informationen zu Triggern und Bereichen finden Sie im Beam-Programmierleitfaden unter Trigger.

Autoscaling

Der Connector unterstützt Dataflow-Autoscaling, das bei Verwendung von Runner v2 standardmäßig aktiviert ist (erforderlich). Der Dataflow-Autoscaling-Algorithmus berücksichtigt den geschätzten Rückstand des Änderungsstreams, der auf der Seite Dataflow-Monitoring im Abschnitt Backlog überwacht werden kann. Verwenden Sie das Flag --maxNumWorkers, wenn Sie einen Job bereitstellen, um die Anzahl der Worker zu begrenzen.

Informationen zum manuellen Skalieren der Pipeline anstelle des Autoscalings finden Sie unter Streamingpipeline manuell skalieren.

Beschränkungen

Beachten Sie die folgenden Einschränkungen, bevor Sie den Bigtable Beam-Connector mit Dataflow verwenden.

Dataflow-Runner V2

Der Connector kann nur mit Dataflow Runner v2 ausgeführt werden. Geben Sie --experiments=use_runner_v2 in den Befehlszeilenargumenten an, um dies zu aktivieren. Die Ausführung mit Runner v1 führt zum Fehlschlagen Ihrer Pipeline mit der folgenden Ausnahme:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Snapshots

Dataflow-Snapshots werden vom Connector nicht unterstützt.

Duplikate

Der Bigtable Beam-Connector streamt Änderungen für jeden Zeilenschlüssel und jeden Cluster in der Reihenfolge des Commit-Zeitstempels. Da er jedoch manchmal von einem früheren Zeitpunkt im Stream neu startet, kann er zu Duplikaten führen.

Hinweise

Bevor Sie den Connector verwenden können, müssen folgende Voraussetzungen erfüllt sein:

Authentifizierung einrichten

Wenn Sie die Java Beispiele auf dieser Seite in einer lokalen Entwicklungsumgebung verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.

  1. Installieren Sie die Google Cloud CLI.
  2. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  3. Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Google-Konto:

    gcloud auth application-default login

Weitere Informationen unter Set up authentication for a local development environment.

Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Set up Application Default Credentials for code running on Google Cloud.

Änderungsstream aktivieren

Sie müssen einen Änderungsstream für eine Tabelle aktivieren, bevor Sie sie lesen können. Sie können auch eine neue Tabelle mit aktivierten Änderungsstreams erstellen.

Tabelle mit Metadaten des Streams ändern

Wenn Sie Änderungen mit Dataflow streamen, erstellt der Bigtable Beam-Connector eine Metadatentabelle, die standardmäßig den Namen __change_stream_md_table hat. Die Metadatentabelle des Änderungsstreams verwaltet den Betriebsstatus des Connectors und speichert Metadaten zu Datenänderungseinträgen.

Standardmäßig erstellt der Connector die Tabelle in derselben Instanz wie die gestreamte Tabelle. Damit die Tabelle korrekt funktioniert, muss das Anwendungsprofil für die Metadatentabelle Single-Cluster-Routing verwenden und Transaktionen für einzelne Zeilen aktivieren.

Weitere Informationen zum Streamen von Änderungen aus Bigtable mit dem Bigtable Beam-Connector finden Sie in der Dokumentation zu BigtableIO.

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zu gewähren, um die Berechtigungen zu erhalten, die Sie zum Lesen eines Bigtable-Änderungsstreams mithilfe von Dataflow benötigen.

Um die Änderungen aus Bigtable zu lesen, benötigen Sie diese Rolle:

  • Bigtable-Administrator (roles/bigtable.admin) für die Bigtable-Instanz, die die Tabelle enthält, aus der Sie Änderungen streamen möchten

Zum Ausführen des Dataflow-Jobs benötigen Sie folgende Rollen:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff verwalten.

Möglicherweise können Sie die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Bigtable Beam-Connector als Abhängigkeit hinzufügen

Fügen Sie Code ähnlich der folgenden Abhängigkeit zu Ihrer Maven pom.xml-Datei hinzu. Die Version muss 2.48.0 oder höher sein.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Änderungsstream lesen

Um eine Dataflow-Pipeline zum Lesen Ihrer Datensätze zu Datenänderungen zu erstellen, konfigurieren Sie den Connector und fügen dann Transformationen und Senken hinzu. Anschließend verwenden Sie den Connector, um ChangeStreamMutation-Objekte in einer Beam-Pipeline zu lesen.

Die Codebeispiele in diesem Abschnitt, die in Java geschrieben sind, zeigen, wie Sie eine Pipeline erstellen und damit Schlüssel/Wert-Paare in einen String konvertieren. Jedes Paar besteht aus einem Zeilenschlüssel und einem ChangeStreamMutation-Objekt. Die Pipeline konvertiert die Einträge jedes Objekts in einen durch Kommas getrennten String.

Pipeline erstellen

Dieses Java-Codebeispiel zeigt, wie die Pipeline erstellt wird:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Datensätze zu Datenänderungen verarbeiten

Dieses Beispiel zeigt, wie Sie alle Einträge in einem Datensatz für Datenänderungen für eine Zeile durchgehen und eine Konvertierungsmethode in String aufrufen, die auf dem Eintragstyp basiert.

Eine Liste der Eintragstypen, die ein Datensatz für Datenänderung enthalten kann, finden Sie unter Was enthält ein Datensatz für Datenänderung?.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

In diesem Beispiel wird ein write-Eintrag konvertiert:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

In dieser Stichprobe wird ein Eintrag für das Löschen von Zellen konvertiert:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

In diesem Beispiel wird ein Löschen eines Spaltenfamilieneintrags konvertiert:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Überwachen

Mit den folgenden Ressourcen in der Google Cloud Console können Sie Ihre Google Cloud-Ressourcen überwachen, während Sie eine Dataflow-Pipeline ausführen, um einen Bigtable-Änderungsstream zu lesen:

Überprüfen Sie insbesondere die folgenden Messwerte:

  • Prüfen Sie auf der Bigtable-Seite Monitoring diese metrics:
    • Daten zur CPU-Auslastung nach Änderungsstreams im Messwert cpu_load_by_app_profile_by_method_by_table. Zeigt die Auswirkungen des Änderungsstreams auf die CPU-Nutzung Ihres Clusters an.
    • Speicherauslastung des Änderungsstreams (Byte) (change_stream_log_used_bytes)
  • Prüfen Sie auf der Dataflow-Monitoringseite die Datenaktualität. Hier wird der Unterschied zwischen der aktuellen Uhrzeit und dem Wasserzeichen angezeigt. Sie sollte etwa zwei Minuten dauern. Gelegentlich kann es ein bis zwei Minuten länger dauern. Wenn der Messwert für die Datenaktualität ständig über diesem Grenzwert liegt, verfügt Ihre Pipeline wahrscheinlich über unzureichende Ressourcen und Sie sollten weitere Dataflow-Worker hinzufügen. Die Datenaktualität gibt keinen Hinweis darauf, ob Datensätze zur Änderung von Daten langsam verarbeitet werden.
  • Der Dataflow-Messwert processing_delay_from_commit_timestamp_MEAN gibt die durchschnittliche Verarbeitungszeit von Datensätzen zu Datenänderungen während der Lebensdauer des Jobs an.

Der Bigtable-Messwert server/latencies ist nicht nützlich, wenn Sie eine Dataflow-Pipeline beobachten, die einen Bigtable-Änderungsstream liest, da er die Dauer der Streaminganfrage und nicht die Verarbeitungslatenz des Datenänderungseintrags widerspiegelt. Eine hohe Latenz in einem Änderungsstream bedeutet nicht, dass die Anfragen langsam verarbeitet werden. Es bedeutet, dass die Verbindung so lange geöffnet war.

Nächste Schritte