Verbindung zu Änderungsstreams mit Dataflow erstellen

Auf dieser Seite wird gezeigt, wie Sie Dataflow-Pipelines erstellen, Spanner-Änderungsdaten mithilfe von Änderungsstreams Mit dem Beispielcode auf dieser Seite können Sie benutzerdefinierte Pipelines erstellen.

Wichtige Konzepte

Im Folgenden finden Sie einige Kernkonzepte für Dataflow-Pipelines für Änderungsstreams.

Dataflow

Dataflow ist ein serverloser, schneller und kostengünstiger Dienst, der sowohl Stream- als auch die Batchverarbeitung. Es bietet Portabilität mit Verarbeitungsjobs, die mit dem Open Source Apache Beam und automatisiert die Infrastrukturbereitstellung und die Clusterverwaltung. Dataflow bietet ein Streaming nahezu in Echtzeit beim Lesen aus Änderungsstreams.

Sie können Dataflow verwenden, um Spanner-Änderungen zu übernehmen Streams mit SpannerIO Connector, der eine Abstraktion ermöglicht, über die Spanner API zum Abfragen von Änderungsstreams. Mit Connectors erstellen, müssen Sie die Änderungsstreams des Partitionslebenszyklus, der erforderlich ist, wenn Sie die Spanner API direkt ausführen. Der Connector bietet Ihnen mit einem Strom von Änderungsprotokollen, damit Sie sich und weniger auf API-Details und dynamische Änderungsstreampartitionierung. Wir empfehlen, den SpannerIO-Connector zu verwenden, als bei der Spanner API, wenn Sie in den meisten Fällen Änderungsstreamdaten.

Dataflow-Vorlagen sind vordefinierte Dataflow-Pipelines, die gängige Anwendungsfälle implementieren. Weitere Informationen finden Sie unter Dataflow-Vorlagen

Dataflow-Pipeline

Eine Dataflow-Pipeline für Spanner-Änderungsstreams besteht aus vier Hauptteilen:

  1. Eine Spanner-Datenbank mit einem Änderungsstream
  2. Der SpannerIO-Connector
  3. Benutzerdefinierte Transformationen und Senken
  4. Ein Senken-E/A-Autor

Image

Diese werden im Folgenden ausführlicher erörtert.

Spanner-Änderungsstream

Weitere Informationen zum Erstellen eines Änderungsstreams finden Sie unter Änderungsstream erstellen Änderungsstream.

Apache Beam SpannerIO-Connector

Dies ist der zuvor beschriebene SpannerIO-Connector. Es ist ein Quell-E/A-Connector, der PCollection an Änderungseinträgen von Daten ausgibt, spätere Phasen der Pipeline. Die Ereigniszeit für jeden Änderungseintrag der ausgegebenen Datensätze ist der Commit-Zeitstempel. Beachten Sie, dass die ausgegebenen Datensätze unordered ist und der SpannerIO-Connector garantiert, verspätete Einträge.

Bei der Arbeit mit Änderungsstreams verwendet Dataflow Prüfpunktausführung. Daher kann jeder Worker bis zum konfigurierten Prüfpunktintervall warten zum Zwischenspeichern von Änderungen, bevor sie zur weiteren Verarbeitung gesendet werden.

Benutzerdefinierte Transformationen

Eine benutzerdefinierte Transformation ermöglicht es einem Nutzer, Daten in einer Dataflow-Pipeline zu verarbeiten. Gängige Anwendungsfälle hierfür sind das Entfernen personenidentifizierbarer Informationen, die Erfüllung von Anforderungen an das Downstream-Datenformat und die Sortierung. In der offiziellen Apache Beam-Dokumentation finden Sie das Programmierhandbuch zu Transformationen.

Apache Beam-Senken-E/A-Autor

Apache Beam enthält integrierten E/A-Transformationen zum Schreiben aus einer Dataflow-Pipeline in eine Datensenke wie BigQuery. Die gängigsten Datensenken werden nativ unterstützt.

Dataflow-Vorlagen

Dataflow Vorlagen eine einfache Möglichkeit, Dataflow-Jobs basierend auf vordefinierten Docker-Anwendungen zu erstellen Bilder für häufige Anwendungsfälle über die Google Cloud Console, die Google Cloud CLI oder REST API aufrufen.

Für Spanner-Änderungsstreams bieten wir drei Flexible Dataflow-Vorlagen:

IAM-Berechtigungen für Dataflow-Vorlagen festlegen

Vor dem Erstellen eines Dataflow-Jobs mit den drei flexiblen Vorlagen müssen Sie sicherstellen, dass Sie Erforderliche IAM-Berechtigungen für die folgenden Dienstkonten:

Wenn Sie nicht die erforderlichen IAM-Berechtigungen haben, Nutzerverwaltetes Worker-Dienstkonto angeben um den Dataflow-Job zu erstellen. Weitere Informationen finden Sie unter Sicherheit und Berechtigungen in Dataflow

Wenn Sie versuchen, einen Job aus einer Dataflow-Flex-Vorlage ohne alle erforderlichen Berechtigungen auszuführen, schlägt der Job möglicherweise mit dem Fehler Fehler beim Lesen der Ergebnisdatei oder Berechtigung für Ressource verweigert fehl. Weitere Informationen finden Sie unter Fehlerbehebung bei Flex-Vorlagen.

Dataflow-Pipeline erstellen

In diesem Abschnitt geht es um die Erstkonfiguration des Connectors und Beispiele für gängige Integrationen mit Spanner-Änderungsstreams Funktionalität.

Sie benötigen eine Java-Entwicklungsumgebung, um diese Schritte ausführen zu können. für Dataflow. Weitere Informationen finden Sie unter Erstellen Sie eine Dataflow-Pipeline mit Java.

Änderungsstream erstellen

Weitere Informationen zum Erstellen eines Änderungsstreams finden Sie unter Änderungsstream erstellen. Sie benötigen eine Spanner-Datenbank, um mit den nächsten Schritten fortzufahren mit einem konfigurierten Änderungsstream.

Detaillierte Berechtigungen für die Zugriffssteuerung gewähren

Wenn Sie davon ausgehen, dass Nutzer mit detaillierten Zugriffssteuerungen den Dataflow-Job ausführen, sicherzustellen, dass den Nutzern Zugriff auf eine Datenbank gewährt wird Rolle mit der Berechtigung SELECT für den Änderungsstream und die Rolle EXECUTE Berechtigung für die Tabellenwertfunktion des Änderungsstreams. Stellen Sie außerdem sicher, Principal die Datenbankrolle in der SpannerIO-Konfiguration oder in die flexible Dataflow-Vorlage.

Weitere Informationen finden Sie unter Detaillierte Zugriffssteuerung.

SpannerIO-Connector als Abhängigkeit hinzufügen

Der Apache Beam SpannerIO-Connector kapselt die Komplexität der Verarbeitung der Änderungsstreams direkt über die Cloud Spanner API und gibt eine Kollektion von Änderungsstream-Datensätzen an spätere Phasen der Pipeline aus.

Diese Objekte können auch in anderen Phasen der Dataflow-Pipeline des Nutzers aufgenommen werden. Die Integration von Änderungsstreams ist Teil des SpannerIO-Connectors. Um den SpannerIO-Connector verwenden zu können, muss die Abhängigkeit zu Ihrer pom.xml-Datei hinzugefügt werden:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

Metadatendatenbank erstellen

Der Connector muss beim Ausführen der Datei Apache Beam-Pipeline Diese Metadaten werden in einem Spanner- Tabelle, die vom Connector während der Initialisierung erstellt wurde. Sie geben die Datenbank, in der diese Tabelle beim Konfigurieren des Connector.

Wie in den Best Practices für Änderungsstreams beschrieben, empfehlen wir, zu diesem Zweck eine neue Datenbank zu erstellen, anstatt dem Connector zu erlauben, die Datenbank Ihrer Anwendung zum Speichern der Metadatentabelle zu verwenden.

Der Inhaber eines Dataflow-Jobs, der SpannerIO verwendet muss der Connector die folgenden IAM-Berechtigungen Berechtigungen, die für diese Metadatendatenbank festgelegt wurden:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Connector konfigurieren

Der Connector für Spanner-Änderungsstreams kann so konfiguriert werden:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

Im Folgenden werden die readChangeStream()-Optionen beschrieben:

Spanner-Konfiguration (erforderlich)

Wird zum Konfigurieren des Projekts, der Instanz und der Datenbank verwendet, in denen der Änderungsstream erstellt wurde und von dem abgefragt werden soll. Gibt optional auch die Datenbankrolle an, die verwendet werden soll, wenn die IAM-Rolle Das Hauptkonto, das den Dataflow-Job ausführt, ist ein Nutzer mit fein abgestimmter Zugriffssteuerung. Der Job übernimmt diese Datenbankrolle für den Zugriff auf den Änderungsstream. Für Weitere Informationen finden Sie unter Über die detaillierte Zugriffssteuerung.

Name des Streams ändern (erforderlich)

Dieser Name identifiziert den Änderungsstream eindeutig. Der hier angegebene Name muss mit dem Namen übereinstimmen, der bei der Erstellung verwendet wurde.

Metadateninstanz-ID (optional)

Dies ist die Instanz zum Speichern der Metadaten, die vom Connector verwendet werden, um die Nutzung der API-Daten des Änderungsstreams zu steuern.

Datenbank-ID für Metadaten (erforderlich)

Dies ist die Datenbank zum Speichern der Metadaten, die vom Connector verwendet werden, um die Nutzung der API-Daten des Änderungsstreams zu steuern.

Name der Metadatentabelle (optional)

Sollte nur beim Aktualisieren einer vorhandenen Pipeline verwendet werden.

Dies ist der bereits vorhandene Name der Metadatentabelle, der vom Connector. Er wird vom Connector verwendet, um die Metadaten den Verbrauch der API-Daten des Änderungsstreams zu steuern. Wenn diese Option ausgelassen, erstellt Spanner eine neue Tabelle mit einer generierten bei der Connector-Initialisierung.

RPC-Priorität (optional)

Die Anfrage die für die Abfragen des Änderungsstreams verwendet werden. Wenn Sie diesen Parameter nicht angeben, wird high priority verwendet.

InclusiveStartAt (erforderlich)

Änderungen ab dem angegebenen Zeitstempel werden an den Aufrufer zurückgegeben.

InclusiveEndAt (optional)

Änderungen bis zum angegebenen Zeitstempel werden an den Aufrufer zurückgegeben. Wenn Sie diesen Parameter nicht angeben, werden Änderungen auf unbestimmte Zeit ausgegeben.

Transformationen und Senken hinzufügen, um Änderungsdaten zu verarbeiten

Nachdem Sie die vorherigen Schritte ausgeführt haben, kann der konfigurierte SpannerIO-Connector eine PCollection von DataChangeRecord-Objekten ausgeben. Weitere Beispiele finden Sie unter Beispieltransformationen und -senken. Pipelinekonfigurationen, die diese gestreamten Daten auf unterschiedliche Weise verarbeiten.

Beachten Sie, dass die vom SpannerIO-Connector ausgegebenen Änderungsstreameinträge ungeordnet sind. Das liegt daran, dass PCollections keine Bestellgarantien bieten. Wenn Sie einen geordneten Stream benötigen, müssen Sie die Datensätze in Ihren Pipelines als Transformationen gruppieren und sortieren. Siehe Beispiel: Nach Schlüssel sortieren. Sie können dieses Beispiel erweitern, um die Datensätze nach beliebigen Feldern der Datensätze zu sortieren, z. B. nach Transaktions-IDs.

Beispieltransformationen und -senken

Sie können eigene Transformationen definieren und Senken angeben, in die die Daten geschrieben werden. Die Apache Beam-Dokumentation enthält eine Vielzahl von Transformationen, die angewendet werden können, sowie vorkonfigurierte E/A-Connectors zum Schreiben der Daten in externe Systeme.

Beispiel: Sortieren nach Schlüssel

In diesem Codebeispiel werden mithilfe des Dataflow-Connectors Datenänderungseinträge ausgegeben, die nach Commit-Zeitstempel sortiert und nach Primärschlüsseln gruppiert sind.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

Dieses Codebeispiel verwendet Status und Timer, um Datensätze für jeden Schlüssel zu puffern, und legt die Ablaufzeit des Timers auf eine vom Nutzer konfigurierte Zeit T in der Zukunft fest (definiert in der Funktion BufferKeyUntilOutputTimestamp). Wenn das Dataflow-Wasserzeichen die Zeit T durchläuft, leert dieser Code alle Datensätze im Zwischenspeicher mit einem Zeitstempel kleiner als T, ordnet diese Datensätze nach dem Commit-Zeitstempel und gibt ein Schlüssel/Wert-Paar aus, wobei Folgendes gilt:

  • Der Schlüssel ist der Eingabeschlüssel, also der Primärschlüssel, der in einem Bucket-Array der Größe 1.000 gehasht ist.
  • Der Wert sind die sortierten Datenänderungseinträge, die für den Schlüssel zwischengespeichert wurden.

Für jeden Schlüssel gibt es folgende Garantien:

  • Timer werden garantiert in der Reihenfolge des Ablaufzeitstempels ausgelöst.
  • Nachgelagerte Phasen erhalten die Elemente garantiert in derselben Reihenfolge, in der sie produziert wurden.

Angenommen, für einen Schlüssel mit dem Wert 100 wird der Timer jeweils bei T1 und T10 ausgelöst. Bei jedem Zeitstempel wird dann ein Datenänderungssatz erstellt. Da die am T1 ausgegebenen Datenänderungseinträge vor den am T10 ausgegebenen Datenänderungseinträgen erstellt wurden, werden die am T1 ausgegebenen Datenänderungseinträge auch von der nächsten Phase vor den an T10 ausgegebenen Datenänderungseinträgen empfangen. Dieser Mechanismus hilft uns dabei, für die nachgelagerte Verarbeitung eine strikte Reihenfolge der Commit-Zeitstempel nach Primärschlüssel zu garantieren.

Dieser Vorgang wird so lange wiederholt, bis die Pipeline endet und alle Datensätze für Änderungen verarbeitet wurden. Wenn kein Ende angegeben ist, wird der Vorgang auf unbestimmte Zeit wiederholt.

Beachten Sie, dass in diesem Codebeispiel Status und Timer anstelle von Fenstern verwendet werden, um eine Sortierung nach Schlüssel durchzuführen. Die Begründung ist, dass Zeitfenster nicht garantiert sind. in der sie verarbeitet werden müssen. Dies bedeutet, dass ältere Fenster später verarbeitet werden können als neuere Fenster, was zu einer Bearbeitung außerhalb der Reihenfolge führen kann.

BreakRecordByModFn

Jeder Datensatz für eine Datenänderung kann mehrere Mods enthalten. Jeder Mod steht für das Einfügen, Aktualisieren oder Löschen eines einzelnen Primärschlüsselwerts. Diese Funktion teilt jeden Datensatz für Datenänderungen in separate Datensätze auf, einen pro Mod.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Diese Funktion nimmt eine DataChangeRecord an und gibt eine DataChangeRecord aus, die durch den Spanner-Primärschlüssel verschlüsselt ist und als Ganzzahlwert gehasht ist.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

Timer und Puffer sind pro Schlüssel. Diese Funktion puffert jeden Datenänderungseintrag, bis das Wasserzeichen den Zeitstempel passiert, bei dem die zwischengespeicherten Datenänderungseinträge ausgegeben werden sollen.

Dieser Code verwendet einen Timer in einer Schleife, um zu bestimmen, wann der Zwischenspeicher geleert werden soll:

  1. Wenn zum ersten Mal ein Datenänderungseintrag für einen Schlüssel erkannt wird, wird der Timer so eingestellt, dass er nach dem Commit-Zeitstempel des Datenänderungseintrags plus incrementIntervalSeconds (eine vom Nutzer konfigurierbare Option) ausgelöst wird.
  2. Wenn der Timer ausgelöst wird, werden im Zwischenspeicher alle Datensätze für Datenänderungen hinzugefügt, deren Zeitstempel kleiner als die Ablaufzeit des Timers ist (recordsToOutput). Wenn der Puffer über Datenänderungseinträge verfügt, deren Zeitstempel größer oder gleich der Ablaufzeit des Timers ist, fügt er diese Datenänderungseinträge wieder in den Puffer hinzu, anstatt sie auszugeben. Anschließend wird der nächste Timer auf die Ablaufzeit des aktuellen Timers plus incrementIntervalInSeconds gesetzt.
  3. Wenn recordsToOutput nicht leer ist, ordnet die Funktion die Datensätze für Datenänderungen in recordsToOutput nach dem Commit-Zeitstempel und der Transaktions-ID und gibt sie dann aus.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Bestelltransaktionen

Diese Pipeline kann nach Transaktions-ID und Commit-Zeitstempel sortiert werden. Zwischenspeichern Sie dazu Einträge für jedes Transaktions-ID-/Commit-Zeitstempelpaar und nicht für jeden Spanner-Schlüssel. Hierfür muss der Code in KeyByIdFn geändert werden.

Beispiel: Transaktionen zusammenführen

Dieses Codebeispiel liest Datenänderungseinträge, fasst alle Datenänderungseinträge derselben Transaktion in einem einzigen Element zusammen und gibt dieses Element aus. Beachten Sie, dass die von diesem Beispielcode ausgegebenen Transaktionen nicht nach dem Commit-Zeitstempel sortiert sind.

In diesem Codebeispiel werden Puffer verwendet, um Transaktionen aus Datenänderungseinträgen zusammenzustellen. Nach Erhalt eines Datenänderungseintrags, der zum ersten Mal zu einer Transaktion gehört, liest er das Feld numberOfRecordsInTransaction im Datenänderungseintrag, das die erwartete Anzahl von Datenänderungseinträgen beschreibt, die zu dieser Transaktion gehören. Sie puffert die zu dieser Transaktion gehörenden Datenänderungseinträge, bis die Anzahl der zwischengespeicherten Datensätze mit numberOfRecordsInTransaction übereinstimmt. Anschließend werden die gebündelten Datensätze zu Datenänderungen ausgegeben.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Diese Funktion nimmt eine DataChangeRecord an und gibt eine DataChangeRecord aus, die durch die Transaktions-ID verschlüsselt ist.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn Zwischenspeicher haben Schlüssel/Wert-Paare empfangen mit {TransactionId, DataChangeRecord} von KeyByTransactionIdFn und speichert sie in Gruppen basierend auf TransactionId. Wenn die Anzahl der gepufferten Datensätzen gleich der Anzahl der Datensätze im gesamte Transaktion sortiert, sortiert diese Funktion die DataChangeRecord-Objekte in der Group by Record-Sequenz und gibt ein Schlüssel/Wert-Paar aus {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.

Hier gehen wir davon aus, dass SortKey eine benutzerdefinierte Klasse ist, die ein {CommitTimestamp, TransactionId}-Paar. Siehe Implementierungsbeispiel für SortKey.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Beispiel: Nach Transaktions-Tag filtern

Wenn eine Transaktion, die Nutzerdaten ändert, getaggt wird, werden das entsprechende Tag und sein Typ als Teil von DataChangeRecord gespeichert. Die folgenden Beispiele veranschaulichen, wie Änderungsstream-Datensätze anhand von benutzerdefinierten Transaktions-Tags und System-Tags gefiltert werden:

Benutzerdefinierte Tag-Filterung für my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

System-Tag-Filterung/TTL-Prüfung:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Beispiel: Vollständige Zeile abrufen

Dieses Beispiel funktioniert mit einer Spanner-Tabelle namens Singer mit der folgenden Definition:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

Im standardmäßigen OLD_AND_NEW_VALUES-Werterfassungsmodus von Änderungsstreams enthält der empfangene Datenänderungseintrag bei einer Aktualisierung einer Spanner-Zeile nur die geänderten Spalten. Nachverfolgte, aber unveränderte Spalten werden nicht in den Datensatz aufgenommen. Mit dem Primärschlüssel des Mods kann am Commit-Zeitstempel des Datenänderungseintrags ein Spanner-Snapshot gelesen werden, um die unveränderten Spalten oder sogar die vollständige Zeile abzurufen.

Beachten Sie, dass die Datenbank-Aufbewahrungsrichtlinie möglicherweise in einen Wert geändert werden muss, der größer oder gleich der Aufbewahrungsrichtlinie des Änderungsstreams ist, damit der Snapshot-Lesevorgang erfolgreich ist.

Außerdem ist die Verwendung des Werterfassungstyps NEW_ROW die empfohlene und effizientere Methode, da standardmäßig alle erfassten Spalten der Zeile zurückgegeben werden und kein zusätzlicher Snapshot-Lesevorgang in Spanner erforderlich ist.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Diese Transformation führt am Commit-Zeitstempel jedes empfangenen Eintrags einen veralteten Lesevorgang durch und ordnet die vollständige Zeile JSON zu.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Mit diesem Code wird ein Spanner-Datenbankclient erstellt, um den gesamten Zeilenabruf durchzuführen. Außerdem wird der Sitzungspool so konfiguriert, dass er nur wenige Sitzungen hat und Lesevorgänge in einer Instanz des ToFullRowJsonFn sequenziell ausführen. Dataflow erzeugt viele Instanzen dieser Funktion mit jeweils einem eigenen Clientpool.

Beispiel: Spanner zu Pub/Sub

In diesem Szenario streamt der Aufrufer Einträge an Pub/Sub als und zwar ohne Gruppierung oder Aggregation. Dies ist eine gute zum Auslösen einer Downstream-Verarbeitung geeignet, z. B. zum Streamen aller neuen Zeilen in eine Spanner-Tabelle eingefügt, Pub/Sub für weitere zu verarbeiten.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Die Pub/Sub-Senke kann so konfiguriert werden, dass die Semantik genau einmal sichergestellt ist.

Beispiel: Spanner für Cloud Storage

In diesem Szenario gruppiert der Aufrufer alle Datensätze innerhalb eines bestimmten Fensters und speichert die Gruppe in separaten Cloud Storage-Dateien. Dies eignet sich gut für Analysen und die Archivierung zu einem bestimmten Zeitpunkt, die unabhängig von der Aufbewahrungsdauer von Spanner ist.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Die Cloud Storage-Senke bietet standardmäßig mindestens einmal Semantik. Mit zusätzlichem Verarbeitungsaufwand kann er so geändert werden, dass er eine genau einmalige Semantik hat.

Wir stellen auch eine Dataflow-Vorlage für diesen Anwendungsfall zur Verfügung: siehe Änderungsstreams mit Cloud Storage verbinden.

Beispiel: Spanner für BigQuery (Haupttabelle)

Hier streamt der Aufrufer Änderungseinträge in BigQuery. Jeder Datensatz zur Datenänderung wird in BigQuery als eine Zeile dargestellt. Das eignet sich gut für Analysen. Dieser Code verwendet die zuvor im Abschnitt Vollständige Zeile abrufen definierten Funktionen, um die vollständige Zeile des Eintrags abzurufen und in BigQuery zu schreiben.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Die BigQuery-Senke bietet standardmäßig mindestens einmal Semantik. Mit zusätzlichem Verarbeitungsaufwand kann er so geändert werden, dass er eine genau einmalige Semantik hat.

Wir stellen auch eine Dataflow-Vorlage für diesen Anwendungsfall zur Verfügung. Siehe Änderungsstreams mit BigQuery verbinden.

Pipeline überwachen

Zum Überwachen einer Dataflow-Pipeline für Änderungsstreams stehen zwei Klassen von Messwerten zur Verfügung.

Dataflow-Standardmesswerte

Dataflow bietet verschiedene Messwerte, um dafür zu sorgen, dass Ihr Job fehlerfrei ist, z. B. Datenaktualität, Systemverzögerung, Jobdurchsatz und Worker-CPU-Auslastung. Weitere Informationen finden Sie unter Monitoring für Dataflow-Pipelines verwenden.

Bei Pipelines für Änderungsstreams sind zwei Hauptmesswerte zu berücksichtigen: die Systemlatenz und die Datenaktualität.

Die Systemlatenz gibt Ihnen die aktuelle maximale Dauer (in Sekunden) an, für die ein Datenelement verarbeitet wird oder auf die Verarbeitung wartet.

Die Datenaktualität zeigt Ihnen die Zeitspanne zwischen jetzt (Echtzeit) und dem Ausgabewasserzeichen. Das Ausgabe-Wasserzeichen der Zeit T gibt an, dass alle Elemente mit einer Ereigniszeit (streng) vor T für die Berechnung verarbeitet wurden. Mit anderen Worten: Mit der Datenaktualität wird gemessen, wie aktuell die Pipeline im Hinblick auf die Verarbeitung der empfangenen Ereignisse ist.

Wenn für die Pipeline nicht genügend Ressourcen vorhanden sind, können Sie dies an diesen beiden Messwerten ablesen. Die Systemlatenz nimmt zu, da Elemente länger warten müssen, bevor sie verarbeitet werden können. Die Datenaktualität erhöht sich ebenfalls, da die Pipeline nicht mit der empfangenen Datenmenge Schritt halten kann.

Benutzerdefinierte Messwerte für Änderungsstreams

Diese Messwerte werden in Cloud Monitoring bereitgestellt und umfassen:

  • Bucket-Latenz (Histogramm) zwischen einem Datensatz, der in Spanner per Commit übergeben wird, bis zu dem vom Connector an eine PCollection ausgegeben wird. Dieser Messwert kann verwendet werden, um alle Leistungsprobleme (Latenz) mit der Pipeline anzusehen.
  • Gesamtzahl der gelesenen Datensätze. Dieser Wert gibt an, wie viele Datensätze der Connector insgesamt ausgegeben hat. Diese Zahl sollte kontinuierlich steigen und den Trend der Schreibvorgänge in der zugrunde liegenden Spanner-Datenbank widerspiegeln.
  • Anzahl der Partitionen, die gerade gelesen werden. Es sollten immer Partitionen sein, die gelesen werden. Wenn diese Zahl null ist, ist ein Fehler in der Pipeline aufgetreten.
  • Gesamtzahl der Abfragen, die während der Ausführung des Connectors ausgeführt werden. Dies ist ein allgemeiner Hinweis auf Änderungsstreamabfragen, die während der Ausführung der Pipeline an die Spanner-Instanz gestellt werden. Damit lässt sich eine Schätzung der Last vom Connector auf die Spanner-Datenbank abrufen.

Vorhandene Pipeline aktualisieren

Eine laufende Pipeline, in der der SpannerIO-Connector zum Verarbeiten von Änderungsstreams verwendet wird, kann aktualisiert werden, wenn die Jobkompatibilitätsprüfungen bestanden werden. Aufgabe müssen Sie den Parameter für den Namen der Metadatentabelle wenn Sie den neuen Job aktualisieren. Verwenden Sie den Wert von metadataTable. Pipeline-Option des zu aktualisierenden Jobs.

Wenn Sie eine von Google bereitgestellte Dataflow-Vorlage verwenden, legen Sie mit dem Parameter spannerMetadataTableName. Sie können auch Ihren vorhandenen Job, um die Metadatentabelle explizit mit der Methode zu verwenden. withMetadataTable(your-metadata-table-name) Zoll die Connector-Konfiguration. Danach können Sie die Anleitung unter Initiieren des Ersatzgerätes Job von der Dataflow-Dokumentation zum Aktualisieren eines laufenden Jobs.

Best Practices für Änderungsstreams und Dataflow

Im Folgenden finden Sie einige Best Practices zum Erstellen von Verbindungen zu Änderungsstreams mit Dataflow.

Separate Metadatendatenbank verwenden

Wir empfehlen, eine separate Datenbank für den SpannerIO-Connector zu erstellen, Metadatenspeicher statt, anstatt ihn für die Verwendung Ihrer Anwendungsdatenbank zu konfigurieren.

Weitere Informationen finden Sie unter Ziehen Sie eine separate Metadatendatenbank in Betracht.

Größe des Clusters anpassen

Eine Faustregel für die anfängliche Anzahl von Workern Job für Spanner-Änderungsstreams ist ein Worker pro 1.000 Schreibvorgänge pro 2. Diese Schätzung kann je nach verschiedene Faktoren, z. B. den Umfang der einzelnen Transaktionen, die Anzahl der aus einer einzelnen Transaktion und anderen Transformationen, Aggregationen oder Senken an, die in der Pipeline verwendet werden.

Nach der ersten Ressourcenbeschaffung ist es wichtig, die genannten Metriken in Pipeline überwachen um sicherzustellen, dass die Pipeline fehlerfrei ist. Wir empfehlen, mit einer die Größe des Worker-Pools verarbeitet die Last und erhöht bei Bedarf die Anzahl der Knoten. Die CPU-Auslastung ist ein wichtiger Messwert, um zu prüfen, ob die Auslastung angemessen ist und ob weitere Knoten erforderlich sind.

Bekannte Einschränkungen

Autoscaling

Autoscaling-Unterstützung für alle Pipelines, die SpannerIO.readChangeStream erfordert Apache Beam 2.39.0 oder oben.

Wenn Sie eine Apache Beam-Version vor 2.39.0 verwenden, muss der Autoscaling-Algorithmus in Pipelines, die SpannerIO.readChangeStream enthalten, explizit als NONE angegeben werden, wie unter Horizontales Autoscaling beschrieben.

Bis eine Dataflow-Pipeline manuell zu skalieren, statt Autoscaling zu verwenden. Streamingpipeline manuell skalieren

Runner V2

Der Connector für Spanner-Änderungsstreams benötigt Dataflow Runner V2. Diese Angabe muss bei der Ausführung manuell angegeben werden. Andernfalls wird ein Fehler ausgegeben. geworfen werden. Sie können Runner V2 angeben, indem Sie Ihren Job mit --experiments=use_unified_worker,use_runner_v2 konfigurieren.

Snapshot

Der Spanner-Connector für Änderungsstreams unterstützt keine Dataflow-Snapshots.

Ausgleichen

Der Connector für Spanner-Änderungsstreams unterstützt nicht einen Job überflüssig machen. Nur vorhandene Jobs können abgebrochen werden.

Sie können auch eine vorhandene Pipeline aktualisieren. ohne aufhören zu müssen.

OpenCensus

Um OpenCensus zum Überwachen der Pipeline Version angeben 0.28.3 oder höher.

NullPointerException beim Start der Pipeline

Ein Fehler in Apache Beam-Version 2.38.0 kann unter bestimmten Bedingungen beim Starten der Pipeline zu einer NullPointerException führen. Dies würde verhindern, dass Ihr Job gestartet wird, und stattdessen diese Fehlermeldung angezeigt:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Um dieses Problem zu beheben, verwenden Sie entweder die Apache Beam-Version 2.39.0 oder höher oder geben Sie die Version von beam-sdks-java-core als 2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

Weitere Informationen