Änderungen mit Dataflow streamen

Mit dem Bigtable Beam-Connector können Sie Dataflow verwenden, um Bigtable-Datenänderungseinträge zu lesen, ohne Partitionsänderungen in Ihrem Code verfolgen oder verarbeiten zu müssen, da der Connector diese Logik für Sie übernimmt.

In diesem Dokument wird beschrieben, wie Sie den Bigtable Beam-Connector konfigurieren und verwenden, um einen Änderungsstream mit einer Dataflow-Pipeline zu lesen. Bevor Sie dieses Dokument lesen, sollten Sie die Übersicht über Änderungsstreams lesen und mit Dataflow vertraut sein.

Alternativen zum Erstellen einer eigenen Pipeline

Wenn Sie keine eigene Dataflow-Pipeline erstellen möchten, können Sie eine der folgenden Optionen verwenden.

Sie können eine von Google bereitgestellte Dataflow-Vorlage verwenden.

Sie können auch die Codebeispiele aus der Bigtable-Anleitung oder der Kurzanleitung als Ausgangspunkt für Ihren Code verwenden.

Der generierte Code muss die google cloud libraries-bom-Version 26.14.0 oder höher verwenden.

Connector-Details

Mit der Bigtable-Beam-Connector-Methode BigtableIO.readChangeStream können Sie einen Stream von Änderungseinträgen (ChangeStreamMutation) lesen, den Sie verarbeiten können. Der Bigtable Beam-Connector ist eine Komponente des Apache Beam-GitHub-Repositorys. Eine Beschreibung des Connector-Codes finden Sie in den Kommentaren unter BigtableIO.java.

Sie müssen den Connector mit Beam-Version 2.48.0 oder höher verwenden. Prüfen Sie unter Apache Beam-Laufzeitunterstützung, ob Sie eine unterstützte Java-Version verwenden. Anschließend können Sie eine Pipeline bereitstellen, die den Connector für Dataflow verwendet, der die Bereitstellung und Verwaltung von Ressourcen übernimmt und die Skalierbarkeit und Zuverlässigkeit der Streamdatenverarbeitung unterstützt.

Weitere Informationen zum Apache Beam-Programmiermodell finden Sie in der Beam-Dokumentation.

Daten ohne Ereigniszeiten gruppieren

Mit dem Bigtable Beam-Connector gestreamte Datenänderungseinträge sind nicht mit Dataflow-Funktionen kompatibel, die von Ereigniszeiten abhängig sind.

Wie unter Replikation und Wasserzeichen erläutert, steigt ein niedriges Wasserzeichen möglicherweise nicht weiter, wenn die Replikation für die Partition den Rest der Instanz nicht erreicht hat. Wenn ein zu niedriges Wasserzeichen aufhört, sich weiterzuentwickeln, kann dies dazu führen, dass der Änderungsstream stockt.

Damit der Stream nicht stört, gibt der Bigtable-Beam-Connector alle Daten mit einem Ausgabezeitstempel von null aus. Der Zeitstempel Null sorgt dafür, dass Dataflow alle Datenänderungseinträge als verspätete Daten betrachtet. Daher sind Dataflow-Features, die von Ereigniszeiten abhängig sind, nicht mit Bigtable-Änderungsstreams kompatibel. Insbesondere können Sie keine Fensterfunktionen, Ereigniszeit-Trigger oder Ereigniszeit-Timer verwenden.

Stattdessen können Sie GlobalWindows mit Triggern ohne Ereigniszeit verwenden, um diese späten Daten in Bereichen zu gruppieren, wie im Beispiel aus der Anleitung gezeigt. Weitere Informationen zu Triggern und Bereichen finden Sie in der Beam-Programmieranleitung unter Trigger.

Autoscaling

Der Connector unterstützt Dataflow-Autoscaling, das bei Verwendung von Runner v2 standardmäßig aktiviert ist (erforderlich). Der Dataflow-Autoscaling-Algorithmus berücksichtigt den geschätzten Rückstand des Änderungsstreams, der auf der Seite Dataflow-Monitoring im Abschnitt Backlog überwacht werden kann. Verwenden Sie bei der Bereitstellung eines Jobs das Flag --maxNumWorkers, um die Anzahl der Worker zu begrenzen.

Informationen zum manuellen Skalieren einer Pipeline anstelle des Autoscalings finden Sie unter Streamingpipeline manuell skalieren.

Beschränkungen

Beachten Sie die folgenden Einschränkungen, wenn Sie den Bigtable Beam-Connector mit Dataflow verwenden.

Dataflow-Runner V2

Der Connector kann nur mit Dataflow Runner v2 ausgeführt werden. Geben Sie dazu in den Befehlszeilenargumenten --experiments=use_runner_v2 an. Wenn sie mit Runner v1 ausgeführt wird, schlägt die Pipeline mit der folgenden Ausnahme fehl:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Snapshots

Der Connector unterstützt keine Dataflow-Snapshots.

Hinweise

Bevor Sie den Connector verwenden können, müssen die folgenden Voraussetzungen erfüllt sein.

Authentifizierung einrichten

Wenn Sie die Java-Beispiele auf dieser Seite aus einer lokalen Entwicklungsumgebung heraus verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.

  1. Installieren Sie die Google Cloud CLI.
  2. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  3. Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Google-Konto:

    gcloud auth application-default login

Weitere Informationen: Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Standardanmeldedaten für Anwendungen für Code einrichten, der in Google Cloud ausgeführt wird.

Änderungsstream aktivieren

Sie müssen für eine Tabelle einen Änderungsstream aktivieren, damit Sie ihn lesen können. Sie können auch eine neue Tabelle erstellen, bei der Änderungsstreams aktiviert sind.

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zu gewähren, um die Berechtigungen zu erhalten, die Sie zum Lesen eines Bigtable-Änderungsstreams mit Dataflow benötigen.

Sie benötigen diese Rolle, um die Änderungen aus Bigtable zu lesen:

  • Bigtable-Administrator (roles/bigtable.admin) für die Bigtable-Instanz, die die Tabelle enthält, aus der Sie Änderungen streamen möchten

Zum Ausführen des Dataflow-Jobs benötigen Sie diese Rollen:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Bigtable-Beam-Connector als Abhängigkeit hinzufügen

Fügen Sie Ihrer Maven-Datei pom.xml Code hinzu, der in etwa der folgenden Abhängigkeit entspricht. Die Version muss 2.48.0 oder höher sein.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Änderungsstream lesen

Zum Erstellen einer Dataflow-Pipeline zum Lesen Ihrer Datenänderungseinträge konfigurieren Sie den Connector und fügen dann Transformationen und Senken hinzu. Anschließend verwenden Sie den Connector, um ChangeStreamMutation-Objekte in einer Beam-Pipeline zu lesen.

Die in Java geschriebenen Codebeispiele in diesem Abschnitt zeigen, wie Sie eine Pipeline erstellen und mithilfe dieser Schlüssel/Wert-Paare in einen String konvertieren. Jedes Paar besteht aus einem Zeilenschlüssel und einem ChangeStreamMutation-Objekt. Die Pipeline wandelt die Einträge jedes Objekts in einen kommagetrennten String um.

Pipeline erstellen

Dieses Java-Codebeispiel zeigt, wie die Pipeline erstellt wird:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Datenänderungseinträge verarbeiten

In diesem Beispiel wird gezeigt, wie Sie in einem Datenänderungseintrag eine Schleife für eine Zeile durchlaufen und eine Convert-to-String-Methode basierend auf dem Eintragstyp aufrufen.

Eine Liste der Eintragstypen, die ein Datenänderungseintrag enthalten kann, finden Sie unter Inhalt eines Datenänderungseintrags.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

In diesem Beispiel wird ein write-Eintrag konvertiert:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

In diesem Beispiel wird ein Eintrag zum Löschen von Zellen konvertiert:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

In diesem Beispiel wird ein Eintrag zum Löschen einer Spaltenfamilie konvertiert:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Überwachen

Mit den folgenden Ressourcen in der Google Cloud Console können Sie Ihre Google Cloud-Ressourcen überwachen, während Sie eine Dataflow-Pipeline ausführen, um einen Bigtable-Änderungsstream zu lesen:

Überprüfen Sie insbesondere die folgenden Messwerte:

  • Prüfen Sie auf der Bigtable-Seite Monitoring die folgenden metrics:
    • Daten zur CPU-Auslastung durch Änderungsstreams im Messwert cpu_load_by_app_profile_by_method_by_table. Zeigt die Auswirkungen des Änderungsstreams auf die CPU-Nutzung des Clusters.
    • Änderungsstream-Speicherauslastung (Byte) (change_stream_log_used_bytes).
  • Prüfen Sie auf der Dataflow-Monitoringseite die Datenaktualität. Dies zeigt den Unterschied zwischen der aktuellen Zeit und dem Wasserzeichen. Sie sollte etwa zwei Minuten dauern, mit gelegentlichen Spitzen ein bis zwei Minuten länger. Wenn der Messwert für die Datenaktualität konstant über diesem Schwellenwert liegt, verfügt Ihre Pipeline wahrscheinlich nicht über genügend Ressourcen und Sie sollten mehr Dataflow-Worker hinzufügen.

Nächste Schritte