Verbindungen zu Änderungsstreams mit Dataflow erstellen

Auf dieser Seite wird gezeigt, wie Sie Dataflow-Pipelines erstellen, die Spanner-Änderungsdaten mithilfe von Änderungsstreams verarbeiten und weiterleiten. Sie können den Beispielcode auf dieser Seite verwenden, um benutzerdefinierte Pipelines zu erstellen.

Wichtige Konzepte

Im Folgenden finden Sie einige Kernkonzepte für Dataflow-Pipelines für Änderungsstreams.

Dataflow

Dataflow ist ein serverloser, schneller und kostengünstiger Dienst, der sowohl Stream- als auch Batchverarbeitung unterstützt. Er bietet Portabilität mit Verarbeitungsjobs, die mithilfe der Open-Source-Apache Beam-Bibliotheken geschrieben wurden, und automatisiert die Infrastrukturbereitstellung und Clusterverwaltung. Dataflow bietet Streaming nahezu in Echtzeit beim Lesen aus Änderungsstreams.

Sie können Spanner-Änderungsstreams mithilfe von Dataflow mit dem SpannerIO-Connector nutzen. Dieser bietet eine Abstraktion über die Spanner API zum Abfragen von Änderungsstreams. Mit diesem Connector müssen Sie den Partitionslebenszyklus für Änderungsstreams nicht verwalten, was erforderlich ist, wenn Sie die Spanner API direkt verwenden. Der Connector stellt Ihnen einen Stream von Datenänderungseinträgen zur Verfügung, sodass Sie sich mehr auf die Anwendungslogik konzentrieren können, anstatt sich mit bestimmten API-Details und der dynamischen Partitionierung des Änderungsstreams zu befassen. In den meisten Fällen, in denen Sie Änderungsstreamdaten lesen müssen, empfehlen wir die Verwendung des SpannerIO-Connectors anstelle der Spanner API.

Dataflow-Vorlagen sind vordefinierte Dataflow-Pipelines, die gängige Anwendungsfälle implementieren. Eine Übersicht finden Sie unter Dataflow-Vorlagen.

Dataflow-Pipeline

Eine Dataflow-Pipeline für Spanner-Änderungsstreams besteht aus vier Hauptteilen:

  1. Eine Spanner-Datenbank mit einem Änderungsstream
  2. SpannerIO-Connector
  3. Benutzerdefinierte Transformationen und Senken
  4. Ein Senken-I/O-Autor

Image

Diese werden im Folgenden näher erläutert.

Spanner-Änderungsstream

Weitere Informationen zum Erstellen eines Änderungsstreams finden Sie unter Änderungsstream erstellen.

SpannerIO-Connector für Apache Beam

Dies ist der zuvor beschriebene SpannerIO-Connector. Er ist ein E/A-Quell-Connector, der PCollection mit Datenänderungseinträgen an spätere Phasen der Pipeline ausgibt. Die Ereigniszeit für jede ausgegebene Datenänderung ist der Commit-Zeitstempel. Die ausgegebenen Datensätze sind ungeordnet und der SpannerIO-Connector garantiert, dass es keine verspäteten Datensätze gibt.

Bei der Arbeit mit Änderungsstreams verwendet Dataflow Prüfpunktausführung. Daher kann es sein, dass jeder Worker bis zum konfigurierten Prüfpunktintervall für die Zwischenspeicherung von Änderungen wartet, bevor er die Änderungen zur weiteren Verarbeitung sendet.

Benutzerdefinierte Transformationen

Mit einer benutzerdefinierten Transformation können Nutzer Verarbeitungsdaten in einer Dataflow-Pipeline aggregieren, umwandeln oder ändern. Häufige Anwendungsfälle dafür sind das Entfernen personenidentifizierbarer Informationen, das Erfüllen der Anforderungen an nachgelagerte Datenformate und das Sortieren. Die Programmieranleitung zu transforms finden Sie in der offiziellen Apache Beam-Dokumentation.

E/A-Autor von Apache Beam-Senken

Apache Beam enthält integrierte E/A-Transformationen, mit denen aus einer Dataflow-Pipeline in eine Datensenke wie BigQuery geschrieben werden kann. Die meisten gängigen Datensenken werden nativ unterstützt.

Dataflow-Vorlagen

Dataflow-Vorlagen bieten eine einfache Möglichkeit, Dataflow-Jobs basierend auf vordefinierten Docker-Images für häufige Anwendungsfälle über die Google Cloud Console, die Google Cloud CLI oder Rest API-Aufrufe zu erstellen.

Für Spanner-Änderungsstreams bieten wir drei flexible Dataflow-Vorlagen:

Dataflow-Pipeline erstellen

In diesem Abschnitt wird die Erstkonfiguration des Connectors beschrieben. Außerdem finden Sie Beispiele für gängige Integrationen mit der Funktion für Spanner-Änderungsstreams.

Zum Ausführen dieser Schritte benötigen Sie eine Java-Entwicklungsumgebung für Dataflow. Weitere Informationen finden Sie unter Dataflow-Pipeline mit Java erstellen.

Änderungsstream erstellen

Weitere Informationen zum Erstellen eines Änderungsstreams finden Sie unter Änderungsstream erstellen. Damit Sie mit den nächsten Schritten fortfahren können, benötigen Sie eine Spanner-Datenbank mit einem konfigurierten Änderungsstream.

Detaillierte Zugriffssteuerungsrechte gewähren

Wenn Sie davon ausgehen, dass Nutzer der detaillierten Zugriffssteuerung den Dataflow-Job ausführen, müssen Sie ihnen Zugriff auf eine Datenbankrolle mit der Berechtigung SELECT für den Änderungsstream und der Berechtigung EXECUTE für die Tabellenwertfunktion des Änderungsstreams gewähren. Achten Sie außerdem darauf, dass das Hauptkonto die Datenbankrolle in der SpannerIO-Konfiguration oder in der flexiblen Dataflow-Vorlage angibt.

Weitere Informationen finden Sie unter Informationen zur fein abgestimmten Zugriffssteuerung.

SpannerIO-Connector als Abhängigkeit hinzufügen

Der Apache Beam SpannerIO-Connector verkapselt die Komplexität der direkten Aufnahme von Änderungsstreams direkt über die Cloud Spanner API und gibt eine Sammlung von Änderungsstream-Datensätzen in späteren Phasen der Pipeline aus.

Diese Objekte können in anderen Phasen der Dataflow-Pipeline des Nutzers verarbeitet werden. Die Einbindung des Änderungsstreams ist Teil des SpannerIO-Connectors. Damit Sie den SpannerIO-Connector verwenden können, muss die Abhängigkeit der Datei pom.xml hinzugefügt werden:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

Metadatendatenbank erstellen

Der Connector muss beim Ausführen der Apache Beam-Pipeline jede Partition verfolgen. Diese Metadaten werden in einer Spanner-Tabelle gespeichert, die während der Initialisierung vom Connector erstellt wurde. Beim Konfigurieren des Connectors geben Sie die Datenbank an, in der diese Tabelle erstellt wird.

Wie unter Best Practices für Änderungsstreams beschrieben, empfehlen wir, zu diesem Zweck eine neue Datenbank zu erstellen, anstatt dem Connector zu erlauben, die Datenbank Ihrer Anwendung zum Speichern seiner Metadatentabelle zu verwenden.

Für den Inhaber eines Dataflow-Jobs, der den SpannerIO-Connector verwendet, müssen die folgenden IAM-Berechtigungen für diese Metadatendatenbank festgelegt sein:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Connector konfigurieren

Der Spanner-Connector für Änderungsstreams kann so konfiguriert werden:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

Im Folgenden finden Sie Beschreibungen der readChangeStream()-Optionen:

Spanner-Konfiguration (erforderlich)

Wird zum Konfigurieren des Projekts, der Instanz und der Datenbank verwendet, in denen der Änderungsstream erstellt wurde und abgefragt werden soll. Gibt optional auch die Datenbankrolle an, die verwendet werden soll, wenn das IAM-Hauptkonto, das den Dataflow-Job ausführt, ein Nutzer einer detaillierten Zugriffssteuerung ist. Der Job nimmt diese Datenbankrolle für den Zugriff auf den Änderungsstream an. Weitere Informationen finden Sie unter Detaillierte Zugriffssteuerung.

Name des Streams ändern (erforderlich)

Mit diesem Namen wird der Änderungsstream eindeutig identifiziert. Der hier angegebene Name muss mit dem Namen übereinstimmen, der bei der Erstellung verwendet wurde.

Metadateninstanz-ID (optional)

Dies ist die Instanz zum Speichern der Metadaten, die vom Connector verwendet werden, um die Nutzung der Änderungsstream-API-Daten zu steuern.

Metadatendatenbank-ID (erforderlich)

Das ist die Datenbank zum Speichern der Metadaten, die vom Connector verwendet werden, um die Nutzung der Änderungsstream-API-Daten zu steuern.

Name der Metadatentabelle (optional)

Dieser Parameter sollte nur verwendet werden, wenn eine vorhandene Pipeline aktualisiert wird.

Dies ist der bereits vorhandene Name der Metadatentabelle, der vom Connector verwendet werden soll. Er wird vom Connector zum Speichern der Metadaten verwendet, um die Nutzung der Änderungsstream-API-Daten zu steuern. Wenn diese Option nicht angegeben wird, erstellt Spanner bei der Initialisierung des Connectors eine neue Tabelle mit einem generierten Namen.

RPC-Priorität (optional)

Die Anfragepriorität, die für die Änderungsstreamabfragen verwendet werden soll. Wenn dieser Parameter weggelassen wird, wird high priority verwendet.

InclusiveStartAt (erforderlich)

Änderungen aus dem angegebenen Zeitstempel werden an den Aufrufer zurückgegeben.

InclusiveEndAt (optional)

Änderungen bis zum angegebenen Zeitstempel werden an den Aufrufer zurückgegeben. Wenn dieser Parameter weggelassen wird, werden Änderungen unbegrenzt ausgegeben.

Transformationen und Senken zur Verarbeitung von Änderungsdaten hinzufügen

Wenn die vorherigen Schritte abgeschlossen sind, kann der konfigurierte SpannerIO-Connector eine Sammlung von DataChangeRecord-Objekten ausgeben. Unter Beispieltransformationen und -senken finden Sie mehrere Beispiel-Pipeline-Konfigurationen, die diese gestreamten Daten auf verschiedene Arten verarbeiten.

Beachten Sie, dass Änderungsstreameinträge, die vom SpannerIO-Connector ausgegeben werden, nicht sortiert sind. Das liegt daran, dass Kollektionen keine Abfolgegarantien bieten. Wenn Sie einen geordneten Stream benötigen, müssen Sie die Datensätze in Ihren Pipelines als Transformationen gruppieren und sortieren, siehe Beispiel: Nach Schlüssel sortieren. Sie können dieses Beispiel erweitern, um die Datensätze anhand beliebiger Felder der Datensätze zu sortieren, z. B. nach Transaktions-IDs.

Beispieltransformationen und -senken

Sie können eigene Transformationen definieren und Senken angeben, in die die Daten geschrieben werden sollen. Die Apache Beam-Dokumentation enthält eine Vielzahl von transforms, die angewendet werden können, sowie gebrauchsfertige E/A-Connectors, um die Daten in externe Systeme zu schreiben.

Beispiel: Nach Schlüssel sortieren

In diesem Codebeispiel werden mit dem Dataflow-Connector Datenänderungseinträge ausgegeben, die nach dem Commit-Zeitstempel sortiert und nach Primärschlüsseln gruppiert sind.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

In diesem Codebeispiel werden Status und Timer verwendet, um Datensätze für jeden Schlüssel zu puffern, und die Ablaufzeit des Timers auf eine vom Nutzer konfigurierte Zeit T in der Zukunft gesetzt (definiert in der Funktion BufferKeyUntilOutputTimestamp). Wenn das Dataflow-Wasserzeichen die Zeit T überschreitet, leert dieser Code alle Datensätze mit einem Zeitstempel kleiner als T im Puffer, sortiert diese Datensätze nach dem Commit-Zeitstempel und gibt ein Schlüssel/Wert-Paar aus, wobei Folgendes gilt:

  • Der Schlüssel ist der Eingabeschlüssel, d. h. der Primärschlüssel, der in ein Bucket-Array der Größe 1.000 umgewandelt wurde.
  • Der Wert sind die sortierten Datensätze der Datenänderung, die für den Schlüssel gepuffert wurden.

Für jeden Schlüssel gelten die folgenden Garantien:

  • Timer werden garantiert in der Reihenfolge des Ablaufzeitstempels ausgelöst.
  • Nachgelagerte Phasen erhalten die Elemente garantiert in derselben Reihenfolge, in der sie erstellt wurden.

Nehmen wir zum Beispiel an, dass der Timer für einen Schlüssel mit dem Wert 100 bei T1 bzw. T10 ausgelöst wird, wodurch bei jedem Zeitstempel ein Bundle aus Datenänderungseinträgen erzeugt wird. Da die am T1 ausgegebenen Datenänderungseinträge erstellt wurden, bevor die Änderungseinträge am T10 ausgegeben wurden, werden die am T1 ausgegebenen Datensätze auch in der nächsten Phase vor den am T10 ausgegebenen Änderungseinträgen empfangen. Dieser Mechanismus hilft uns, eine strikte Commit-Zeitstempelreihenfolge pro Primärschlüssel für die nachgelagerte Verarbeitung zu gewährleisten.

Dieser Vorgang wird wiederholt, bis die Pipeline endet und alle Datenänderungseinträge verarbeitet wurden. Wenn kein Ende angegeben ist, wird er auf unbestimmte Zeit wiederholt.

Hinweis: In diesem Codebeispiel werden Status und Timer anstelle von Fenstern verwendet, um eine Sortierung nach Schlüssel durchzuführen. Der Grund ist, dass Fenster nicht in der richtigen Reihenfolge verarbeitet werden. Dies bedeutet, dass ältere Fenster später verarbeitet werden können als neuere, was dazu führen kann, dass die Auftragsverarbeitung unterbrochen wird.

BreakRecordByModFn

Jeder Datenänderungseintrag kann mehrere Mods enthalten. Jeder Mod stellt ein Einfügen, Aktualisieren oder Löschen in einem einzigen Primärschlüsselwert dar. Diese Funktion zerlegt jeden Datenänderungseintrag in separate Datensätze, einen pro Mod.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Diese Funktion nimmt ein DataChangeRecord und gibt ein DataChangeRecord aus, das vom Spanner-Primärschlüssel als Hash-Wert in eine Ganzzahl codiert wird.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

Timer und Puffer sind pro Schlüssel. Diese Funktion puffert jeden Datenänderungseintrag, bis das Wasserzeichen den Zeitstempel passiert, zu dem wir die gepufferten Datenänderungseinträge ausgeben möchten.

Dieser Code verwendet einen Loop-Timer, um zu bestimmen, wann der Puffer geleert werden soll:

  1. Wenn zum ersten Mal ein Datenänderungseintrag für einen Schlüssel erkannt wird, wird der Timer so eingestellt, dass er beim Commit-Zeitstempel des Datenänderungseintrags und incrementIntervalSeconds (einer vom Nutzer konfigurierbaren Option) ausgelöst wird.
  2. Wenn der Timer ausgelöst wird, fügt er alle Datenänderungseinträge im Puffer mit einem Zeitstempel hinzu, der kleiner als die Ablaufzeit des Timers ist, zu recordsToOutput. Wenn der Puffer über Datenänderungseinträge verfügt, deren Zeitstempel größer oder gleich der Ablaufzeit des Timers ist, fügt er diese Datensätze wieder in den Puffer ein, anstatt sie auszugeben. Dann wird der nächste Timer auf die Ablaufzeit des aktuellen Timers plus incrementIntervalInSeconds gesetzt.
  3. Wenn recordsToOutput nicht leer ist, ordnet die Funktion die Datenänderungseinträge in recordsToOutput nach dem Commit-Zeitstempel und der Transaktions-ID und gibt sie dann aus.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Bestelltransaktionen

Diese Pipeline kann so geändert werden, dass sie nach Transaktions-ID und Commit-Zeitstempel sortiert wird. Zwischenspeichern Sie dazu Datensätze für jedes Paar aus Transaktions-ID und Commit-Zeitstempel, anstatt für jeden Spanner-Schlüssel. Dies erfordert eine Änderung des Codes in KeyByIdFn.

Beispiel: Transaktionen zusammenstellen

Dieses Codebeispiel liest Datenänderungseinträge, fasst alle Datenänderungseinträge, die zu derselben Transaktion gehören, zu einem einzigen Element zusammen und gibt dieses Element aus. Die von diesem Beispielcode ausgegebenen Transaktionen sind nicht nach dem Commit-Zeitstempel sortiert.

In diesem Codebeispiel werden Puffer verwendet, um Transaktionen aus Datenänderungseinträgen zusammenzustellen. Beim ersten Empfangen eines Datenänderungseintrags, der zu einer Transaktion gehört, wird das Feld numberOfRecordsInTransaction im Datenänderungseintrag gelesen, das die erwartete Anzahl der Datensätze für diese Transaktion beschreibt. Sie puffert die zu dieser Transaktion gehörenden Datenänderungseinträge, bis die Anzahl der gepufferten Datensätze mit numberOfRecordsInTransaction übereinstimmt. Daraufhin werden die gebündelten Datenänderungseinträge ausgegeben.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Diese Funktion nimmt ein DataChangeRecord an und gibt ein DataChangeRecord aus, das durch die Transaktions-ID angegeben wird.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn-Zwischenspeicher haben Schlüssel/Wert-Paare von {TransactionId, DataChangeRecord} von KeyByTransactionIdFn empfangen und sie basierend auf TransactionId in Gruppen zwischengespeichert. Wenn die Anzahl der gepufferten Datensätze der Anzahl der Datensätze entspricht, die in der gesamten Transaktion enthalten sind, sortiert diese Funktion die DataChangeRecord-Objekte in der Gruppe nach der Datensatzsequenz und gibt ein Schlüssel/Wert-Paar {CommitTimestamp, TransactionId} (Iterable<DataChangeRecord>) aus.

Hier wird davon ausgegangen, dass SortKey eine benutzerdefinierte Klasse ist, die ein {CommitTimestamp, TransactionId}-Paar darstellt. Beispielimplementierung für SortKey

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Beispiel: Nach Transaktions-Tag filtern

Wenn eine Transaktion getaggt wird, die Nutzerdaten ändern, werden das entsprechende Tag und sein Typ als Teil von DataChangeRecord gespeichert. Die folgenden Beispiele zeigen, wie Änderungsstream-Einträge basierend auf benutzerdefinierten Transaktions- und System-Tags gefiltert werden:

Benutzerdefinierte Tag-Filterung für my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

Filtern von System-Tags/TTL-Prüfung:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Beispiel: Vollständige Zeile abrufen

In diesem Beispiel wird eine Spanner-Tabelle mit dem Namen Singer verwendet, die die folgende Definition hat:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

Wenn im standardmäßigen OLD_AND_NEW_VALUES-Werterfassungsmodus von Änderungsstreams eine Spanner-Zeile aktualisiert wird, enthält der empfangene Datenänderungseintrag nur die Spalten, die geändert wurden. Beobachtete, aber unveränderte Spalten werden nicht in den Eintrag aufgenommen. Mit dem Primärschlüssel des Mods kann ein Spanner-Snapshot beim Commit-Zeitstempel des Datenänderungseintrags gelesen werden, um die unveränderten Spalten oder sogar die vollständige Zeile abzurufen.

Beachten Sie, dass die Datenbankaufbewahrungsrichtlinie möglicherweise in einen Wert geändert werden muss, der größer oder gleich der Aufbewahrungsrichtlinie für den Änderungsstream ist, damit der Snapshot gelesen wird.

Außerdem ist die Verwendung des Werterfassungstyps NEW_ROW die empfohlene und effizientere Methode, da standardmäßig alle erfassten Spalten der Zeile zurückgegeben werden und kein zusätzlicher Snapshot-Lesevorgang in Spanner erforderlich ist.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Diese Transformation führt am Commit-Zeitstempel jedes empfangenen Eintrags einen veralteten Lesevorgang aus und ordnet die vollständige Zeile JSON zu.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Mit diesem Code wird ein Spanner-Datenbankclient erstellt, um den vollständigen Zeilenabruf durchzuführen. Er konfiguriert den Sitzungspool so, dass er nur wenige Sitzungen hat, wobei Lesevorgänge in einer Instanz von ToFullRowJsonFn sequenziell ausgeführt werden. Dataflow sorgt dafür, dass viele Instanzen dieser Funktion erstellt werden, jede mit ihrem eigenen Clientpool.

Beispiel: Spanner zu Pub/Sub

In diesem Szenario streamt der Aufrufer Datensätze so schnell wie möglich ohne Gruppierung oder Aggregation an Pub/Sub. Diese Methode eignet sich gut, um eine nachgelagerte Verarbeitung auszulösen, z. B. wenn alle neuen Zeilen, die in eine Spanner-Tabelle eingefügt wurden, zur weiteren Verarbeitung in Pub/Sub gestreamt werden.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Beachten Sie, dass die Pub/Sub-Senke so konfiguriert werden kann, dass die Semantik genau einmal sichergestellt ist.

Beispiel: Spanner zu Cloud Storage

In diesem Szenario gruppiert der Aufrufer alle Einträge innerhalb eines bestimmten Fensters und speichert die Gruppe in separaten Cloud Storage-Dateien. Diese Option eignet sich gut für Analysen und die Archivierung zu einem bestimmten Zeitpunkt, die unabhängig von der Aufbewahrungsdauer von Spanner ist.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Beachten Sie, dass die Cloud Storage-Senke standardmäßig eine mindestens einmalige Semantik bereitstellt. Mit zusätzlicher Verarbeitung kann sie so geändert werden, dass sie genau einmalig ist.

Für diesen Anwendungsfall stellen wir auch eine Dataflow-Vorlage bereit: siehe Änderungsstreams mit Cloud Storage verbinden.

Beispiel: Spanner zu BigQuery (Haupttabelle)

Hier streamt der Aufrufer Änderungseinträge in BigQuery. Jeder Datenänderungseintrag wird in BigQuery als eine Zeile wiedergegeben. Diese Option eignet sich gut für Analytics. In diesem Code werden die zuvor im Abschnitt Vollständige Zeile abrufen definierten Funktionen verwendet, um die vollständige Zeile des Eintrags abzurufen und in BigQuery zu schreiben.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Beachten Sie, dass die BigQuery-Senke standardmäßig eine mindestens einmalige Semantik bereitstellt. Mit zusätzlicher Verarbeitung kann sie so geändert werden, dass sie genau einmalig ist.

Für diesen Anwendungsfall stellen wir auch eine Dataflow-Vorlage bereit (siehe Änderungsstreams mit BigQuery verbinden).

Pipeline überwachen

Zum Überwachen einer Dataflow-Pipeline für Änderungsstreams stehen zwei Klassen von Messwerten zur Verfügung.

Dataflow-Standardmesswerte

Dataflow bietet mehrere Messwerte, mit denen Sie sicherstellen können, dass Ihr Job fehlerfrei ist, z. B. Datenaktualität, Systemverzögerung, Jobdurchsatz, Worker-CPU-Auslastung und mehr. Weitere Informationen finden Sie unter Monitoring für Dataflow-Pipelines verwenden.

Bei Änderungsstream-Pipelines müssen zwei Hauptmesswerte berücksichtigt werden: die Systemlatenz und die Datenaktualität.

Die Systemlatenz gibt die aktuelle maximale Zeitdauer (in Sekunden) an, für die ein Datenelement verarbeitet wird oder auf die Verarbeitung wartet.

Die Datenaktualität zeigt Ihnen die Zeitspanne zwischen jetzt (Echtzeit) und dem Ausgabe-Wasserzeichen an. Das Ausgabe-Wasserzeichen der Zeit T gibt an, dass alle Elemente mit einer Ereigniszeit (streng) vor T für die Berechnung verarbeitet wurden. Mit anderen Worten, die Datenaktualität misst, wie aktuell die Pipeline in Bezug auf die Verarbeitung der empfangenen Ereignisse ist.

Wenn die Pipeline nicht genügend Ressourcen hat, können Sie dies an diesen beiden Messwerten erkennen. Die Systemlatenz nimmt zu, da Elemente länger warten müssen, bevor sie verarbeitet werden. Die Datenaktualität erhöht sich ebenfalls, da die Pipeline mit der empfangenen Datenmenge nicht Schritt halten kann.

Benutzerdefinierte Messwerte für Änderungsstreams

Diese Messwerte werden in Cloud Monitoring zur Verfügung gestellt und umfassen:

  • Gesammelte Latenz (Histogramm) zwischen einem Datensatz, für den ein Commit in Spanner durchgeführt wird, bis zu seiner Ausgabe durch den Connector in eine Kollektion. Mit diesem Messwert können Sie Leistungsprobleme (Latenz) der Pipeline anzeigen.
  • Gesamtzahl der gelesenen Datensätze. Dies ist ein allgemeiner Indikator für die Anzahl der Datensätze, die vom Connector ausgegeben wurden. Diese Zahl sollte immer zunehmen und den Trend bei Schreibvorgängen in der zugrunde liegenden Spanner-Datenbank widerspiegeln.
  • Anzahl der Partitionen, die gerade gelesen werden. Es sollten immer Partitionen gelesen werden. Wenn diese Zahl null ist, ist ein Fehler in der Pipeline aufgetreten.
  • Gesamtzahl der Abfragen, die während der Ausführung des Connectors ausgeführt wurden. Dies ist ein allgemeiner Hinweis auf Änderungsstreamabfragen, die während der Ausführung der Pipeline an die Spanner-Instanz gestellt wurden. Damit lässt sich eine Schätzung der Last vom Connector für die Spanner-Datenbank abrufen.

Vorhandene Pipeline aktualisieren

Es ist möglich, eine laufende Pipeline zu aktualisieren, die den SpannerIO-Connector zur Verarbeitung von Änderungsstreams verwendet, wenn die Jobkompatibilitätsprüfungen bestanden werden. Dazu müssen Sie beim Aktualisieren den Namen der Metadatentabelle des neuen Jobs explizit festlegen. Verwenden Sie den Wert der Pipelineoption metadataTable aus dem Job, den Sie aktualisieren.

Wenn Sie eine von Google bereitgestellte Dataflow-Vorlage verwenden, legen Sie den Tabellennamen mit dem Parameter spannerMetadataTableName fest. Sie können den vorhandenen Job auch so ändern, dass die Metadatentabelle mit der Methode withMetadataTable(your-metadata-table-name) in der Connector-Konfiguration explizit verwendet wird. Folgen Sie anschließend der Anleitung unter Ersatzjob starten in der Dataflow-Dokumentation, um einen laufenden Job zu aktualisieren.

Best Practices für Änderungsstreams und Dataflow

Im Folgenden finden Sie einige Best Practices zum Erstellen von Änderungsstream-Verbindungen mithilfe von Dataflow.

Separate Metadatendatenbank verwenden

Wir empfehlen, eine separate Datenbank für den SpannerIO-Connector für den Metadatenspeicher zu erstellen, anstatt ihn für die Verwendung Ihrer Anwendungsdatenbank zu konfigurieren.

Weitere Informationen finden Sie unter Separate Metadatendatenbank verwenden.

Größe des Clusters anpassen

Als Faustregel gilt für die anfängliche Anzahl von Workern in einem Spanner-Änderungsstreamjob ein Worker pro 1.000 Schreibvorgänge pro Sekunde. Diese Schätzung kann abhängig von mehreren Faktoren variieren, z. B. von der Größe der einzelnen Transaktionen, der Anzahl der Änderungsstreameinträge, die aus einer einzelnen Transaktion erzeugt werden, und anderen Transformationen, Aggregationen oder Senken, die in der Pipeline verwendet werden.

Nach der ersten Ressourcenbeschaffung ist es wichtig, die unter Pipeline überwachen aufgeführten Messwerte zu verfolgen, um sicherzustellen, dass die Pipeline fehlerfrei ist. Wir empfehlen, mit einer anfänglichen Worker-Poolgröße zu experimentieren und zu beobachten, wie Ihre Pipeline mit der Last umgeht. Erhöhen Sie bei Bedarf die Anzahl der Knoten. Die CPU-Auslastung ist ein wichtiger Messwert, um zu prüfen, ob die Auslastung korrekt ist und mehr Knoten benötigt werden.

Bekannte Einschränkungen

Autoscaling

Die Autoscaling-Unterstützung für alle Pipelines, die SpannerIO.readChangeStream enthalten, erfordert Apache Beam 2.39.0 oder höher.

Wenn Sie eine Apache Beam-Version vor 2.39.0 verwenden, müssen Pipelines, die SpannerIO.readChangeStream enthalten, den Autoscaling-Algorithmus explizit als NONE angeben, wie unter Horizontales Autoscaling beschrieben.

Informationen zum manuellen Skalieren einer Dataflow-Pipeline anstelle von Autoscaling finden Sie unter Streamingpipeline manuell skalieren.

Runner V2

Der Spanner-Connector für Änderungsstreams benötigt Dataflow Runner V2. Dies muss während der Ausführung manuell angegeben werden. Andernfalls wird ein Fehler ausgegeben. Sie können Runner V2 angeben, indem Sie den Job mit --experiments=use_unified_worker,use_runner_v2 konfigurieren.

Snapshot

Der Spanner-Connector für Änderungsstreams unterstützt keine Dataflow-Snapshots.

Ausgleichen

Der Spanner-Connector für Änderungsstreams unterstützt das Leeren eines Jobs nicht. Es ist nur möglich, einen vorhandenen Job abzubrechen.

Sie können auch eine vorhandene Pipeline aktualisieren, ohne sie beenden zu müssen.

OpenCensus

Wenn Sie OpenCensus zum Überwachen Ihrer Pipeline verwenden möchten, geben Sie Version 0.28.3 oder höher an.

NullPointerException beim Start der Pipeline

Ein Programmfehler in Apache Beam Version 2.38.0 kann beim Starten der Pipeline unter bestimmten Bedingungen zu einem NullPointerException führen. Dies würde verhindern, dass Ihr Job gestartet wird, und stattdessen diese Fehlermeldung anzeigen:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Verwenden Sie entweder Apache Beam Version 2.39.0 oder höher oder geben Sie die Version von beam-sdks-java-core manuell als 2.37.0 an, um dieses Problem zu beheben:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

Weitere Informationen