Änderungen mit Dataflow streamen

Mit dem Bigtable Beam-Connector können Sie mit Dataflow Bigtable-Datenänderungseinträge lesen, ohne Prozess-partition Änderungen in Ihrem da der Connector diese Logik für Sie übernimmt.

In diesem Dokument wird beschrieben, wie Sie den Bigtable Beam-Connector konfigurieren und verwenden, um einen Änderungsstream mit einer Dataflow-Pipeline zu lesen. Bevor Sie dieses Dokument lesen, sollten Sie den Artikel Überblick über die Änderungen Streams und machen Sie sich mit Dataflow vertraut.

Alternativen zum Erstellen einer eigenen Pipeline

Wenn Sie keinen eigenen Dataflow- haben, können Sie eine der folgenden Optionen verwenden.

Sie können eine von Google bereitgestellte Dataflow-Vorlage verwenden.

Sie können auch die Codebeispiele aus der Bigtable-Anleitung verwenden oder Schnellstart als Ausgangspunkt für Ihren Code.

Stellen Sie sicher, dass in dem von Ihnen generierten Code google cloud libraries-bom-Version 26.14.0 oder höher.

Connector-Details

Mit der Bigtable-Beam-Connector-Methode BigtableIO.readChangeStream können Sie einen Datenstrom lesen Änderungseinträge (ChangeStreamMutation), die Sie verarbeiten können. Der Bigtable Beam-Connector ist ein Komponente des GitHub für Apache Beam GitHub . Eine Beschreibung des Connector-Codes finden Sie in den Kommentaren unter BigtableIO.java

Sie müssen den Connector mit Beam-Version 2.48.0 oder höher verwenden. Prüfen Sie Apache Beam Laufzeitunterstützung, um sicherzustellen, Sie eine unterstützte Version von Java verwenden. Dann können Sie eine Pipeline bereitstellen, verwendet den Connector, um Dataflow für die Bereitstellung und Verwaltung von Ressourcen und unterstützt die Skalierbarkeit und Zuverlässigkeit von Streamdaten zu verarbeiten.

Weitere Informationen zum Apache Beam-Programmiermodell finden Sie in der Dokumentation zu Beam

Daten ohne Ereigniszeiten gruppieren

Mit dem Bigtable Beam-Connector gestreamte Datenänderungseinträge sind nicht mit Dataflow kompatibel Funktionen, die von Ereigniszeiten abhängen.

Wie im Abschnitt Replikation und Wasserzeichen, ein Ein niedriges Wasserzeichen wird möglicherweise nicht fortgesetzt, wenn die Replikation für die Partition nicht erfasst wurde bis zum Rest der Instanz. Wenn ein niedriges Wasserzeichen nicht mehr weiterbewegt, wird der Änderungsstream angehalten.

Um zu verhindern, dass der Stream angehalten wird, gibt der Bigtable Beam-Connector alle Daten mit dem Ausgabezeitstempel aus Null. Durch den Zeitstempel null berücksichtigt Dataflow alle Daten, Datensätze ändern in verspätete Daten. Daher sind Dataflow-Funktionen, die von Ereigniszeiten abhängen, sind mit Bigtable-Änderungsstreams kompatibel. Insbesondere können Sie nicht nutzen Fensterfunktionen Ereigniszeit-Trigger oder Ereigniszeit-Timer

Stattdessen können Sie GlobalWindows mit Triggern ohne Ereigniszeit, um diese späten Daten in Bereiche zu gruppieren, wie gezeigt. finden Sie im Beispiel aus der Anleitung. Details zu Triggern und Bereichen finden Sie unter Trigger im Beam-Programmierhandbuch.

Autoscaling

Der Connector unterstützt Dataflow-Autoscaling der standardmäßig aktiviert ist, wenn Sie Runner v2 (erforderlich). Der Dataflow-Autoscaling-Algorithmus berücksichtigt den geschätzten Rückstand beim Änderungsstream, der in der Dataflow-Monitoring im Abschnitt Backlog. Verwenden Sie das Flag --maxNumWorkers beim Bereitstellen einer die Anzahl der Worker begrenzen.

Informationen zum manuellen Skalieren der Pipeline anstelle des Autoscalings finden Sie unter Streamingpipeline manuell skalieren

Beschränkungen

Beachten Sie die folgenden Einschränkungen, bevor Sie den Bigtable Beam-Connector mit Dataflow verwenden.

Dataflow-Runner V2

Der Connector kann nur mit Dataflow Runner v2. Geben Sie --experiments=use_runner_v2 in der Befehlszeile an, um diese Funktion zu aktivieren Argumente. Die Ausführung mit Runner v1 führt dazu, dass Ihre Pipeline mit dem folgende Ausnahme:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Snapshots

Der Connector unterstützt keine Dataflow-Snapshots.

Duplikate

Der Bigtable Beam-Connector streamt Änderungen für jeden Zeilenschlüssel und jeden dass der Cluster in der Commit-Zeitstempelreihenfolge neu startet, früheren Zeitpunkten im Stream, kann es zu Duplikaten kommen.

Hinweise

Bevor Sie den Connector verwenden können, müssen folgende Voraussetzungen erfüllt sein:

Authentifizierung einrichten

Wenn Sie die Java Beispiele auf dieser Seite in einer lokalen Entwicklungsumgebung verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. Create local authentication credentials for your user account:

    gcloud auth application-default login

Weitere Informationen unter Set up authentication for a local development environment.

Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Set up Application Default Credentials for code running on Google Cloud.

Änderungsstream aktivieren

Du musst Änderungsstream aktivieren auf einem Tisch liegen, bevor Sie sie lesen können. Sie können auch Neue Tabelle erstellen mit aktivierten Änderungsstreams.

Tabelle mit Änderungsstreammetadaten

Wenn Sie Änderungen mit Dataflow streamen, Der Bigtable Beam-Connector erstellt eine Metadatentabelle mit folgendem Namen: Standardmäßig __change_stream_md_table. Die Metadatentabelle des Änderungsstreams verwaltet den Betriebsstatus des Connectors und speichert Metadaten zu Datenänderungen Datensätze.

Standardmäßig wird die Tabelle vom Connector in derselben Instanz erstellt wie die Tabelle. das gestreamt wird. Damit die Tabelle richtig funktioniert, muss das Anwendungsprofil für den Metadatentabelle muss Single-Cluster-Routing verwenden und Transaktionen aktiviert.

Weitere Informationen zum Streamen von Änderungen aus Bigtable mit des Bigtable Beam-Connectors, siehe BigtableIO Dokumentation.

Erforderliche Rollen

Um die Berechtigungen zu erhalten, die Sie zum Lesen einer Bigtable-Änderung benötigen mit Dataflow streamen, bitten Sie Ihren Administrator, Ihnen folgenden IAM-Rollen.

Um die Änderungen aus Bigtable zu lesen, benötigen Sie diese Rolle:

  • Bigtable-Administrator (roles/bigtable.admin) auf der Bigtable-Instanz, die die Tabelle enthält, Streamänderungen von

Zum Ausführen des Dataflow-Jobs benötigen Sie folgende Rollen:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff verwalten.

Möglicherweise erhalten Sie die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierten Rollen hinzufügen.

Bigtable Beam-Connector als Abhängigkeit hinzufügen

Fügen Sie Code ähnlich der folgenden Abhängigkeit zu Ihrer Maven pom.xml-Datei hinzu. Die Version 2.48.0 oder höher sein.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Änderungsstream lesen

Um eine Dataflow-Pipeline zum Lesen Ihrer Änderungsdatensätze zu erstellen, konfigurieren Sie den Connector und fügen dann Transformationen und Senken hinzu. Dann verwenden Sie den Connector zum Lesen von ChangeStreamMutation-Objekten in einer Beam-Pipeline.

Die Codebeispiele in diesem Abschnitt, die in Java geschrieben sind, zeigen, wie Sie eine und verwenden Sie sie zum Konvertieren von Schlüssel/Wert-Paaren in einen String. Jedes Paar besteht aus eines Zeilenschlüssels und eines ChangeStreamMutation-Objekts. Die Pipeline konvertiert jeden in einen durch Kommas getrennten String umwandeln.

Pipeline erstellen

Dieses Java-Codebeispiel zeigt, wie die Pipeline erstellt wird:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Datensätze zu Datenänderungen verarbeiten

Dieses Beispiel zeigt, wie alle Einträge in einem Datensatz für Datenänderungen durchlaufen werden können. für eine Zeile und rufen Sie eine Methode zum Umwandeln in einen String basierend auf dem Eintragstyp auf.

Eine Liste der Eintragstypen, die ein Datensatz für Datenänderung enthalten kann, finden Sie unter Was enthält ein Datensatz für Datenänderung?

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

In diesem Beispiel wird ein write-Eintrag konvertiert:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

In dieser Stichprobe wird ein Eintrag für das Löschen von Zellen konvertiert:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

In diesem Beispiel wird ein Löschen eines Spaltenfamilieneintrags konvertiert:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Überwachen

Mit den folgenden Ressourcen in der Google Cloud Console können Sie Ihre Google Cloud-Ressourcen während der Ausführung einer Dataflow-Pipeline, Bigtable-Änderungsstreams lesen:

Überprüfen Sie insbesondere die folgenden Messwerte:

  • Prüfen Sie auf der Bigtable-Seite Monitoring Folgendes: Messwerte:
    • Daten zur CPU-Auslastung nach Änderungsstreams im Messwert cpu_load_by_app_profile_by_method_by_table Zeigt den Änderungsstream auf die CPU-Nutzung des Clusters auswirken.
    • Speicherauslastung des Änderungsstreams (Byte) (change_stream_log_used_bytes).
  • Prüfen Sie auf der Dataflow-Monitoringseite die Daten Aktualität: Hier sehen Sie den Unterschied zwischen der aktuellen Uhrzeit und der Wasserzeichen. Sie sollte etwa zwei Minuten dauern. Gelegentlich kann es zu einem Anstieg ein oder zwei Minuten länger. Wenn der Messwert für die Datenaktualität kontinuierlich höher ist, als dieser Grenzwert erreicht ist, sind die Ressourcen Ihrer Pipeline wahrscheinlich unzureichend weitere Dataflow-Worker hinzufügen sollten. Die Datenaktualität um anzuzeigen, ob Datenänderungsdatensätze langsam verarbeitet werden.
  • Die Dataflow-processing_delay_from_commit_timestamp_MEAN die durchschnittliche Verarbeitungszeit von Datenänderungsdatensätzen im Lebensdauer des Jobs.

Der Bigtable-Messwert server/latencies ist nicht hilfreich, wenn Sie Monitoring einer Dataflow-Pipeline, die eine Bigtable-Änderungsstream, da er die Streaminganfrage widerspiegelt und nicht die Verarbeitungslatenz des Data-Change-Records. Hohe Latenz in einem bedeutet nicht, dass Anfragen langsam verarbeitet werden. bedeutet das die Verbindung so lange geöffnet war.

Nächste Schritte