Verbindungen zu Änderungsstreams mit Dataflow erstellen

Auf dieser Seite erfahren Sie, wie Sie Dataflow-Pipelines erstellen, die Spanner-Änderungsdaten mithilfe von Änderungsstreams verarbeiten und weiterleiten. Sie können den Beispielcode auf dieser Seite verwenden, um benutzerdefinierte Pipelines zu erstellen.

Wichtige Konzepte

Im Folgenden finden Sie einige Kernkonzepte für Dataflow-Pipelines für Änderungsstreams.

Dataflow

Dataflow ist ein serverloser, schneller und kostengünstiger Dienst, der sowohl die Stream- als auch die Batchverarbeitung unterstützt. Es bietet Portabilität bei Verarbeitungsjobs, die mit den Open-Source-Bibliotheken von Apache Beam geschrieben wurden, und automatisiert die Infrastrukturbereitstellung und Clusterverwaltung. Dataflow bietet ein nahezu Echtzeit-Streaming beim Lesen aus Änderungsstreams.

Sie können Dataflow mit dem SpannerIO-Connector verwenden, um Spanner-Änderungsstreams zu verarbeiten. Dieser bietet eine Abstraktion über die Spanner API zum Abfragen von Änderungsstreams. Mit diesem Connector müssen Sie den Partitionslebenszyklus von Änderungsstreams nicht verwalten, was bei der direkten Verwendung der Spanner API erforderlich ist. Der Connector stellt einen Stream mit Datenänderungssätzen bereit, sodass Sie sich mehr auf die Anwendungslogik und weniger auf bestimmte API-Details und die dynamische Partitionierung von Änderungsstreams konzentrieren können. In den meisten Fällen, in denen Sie Änderungsstreamdaten lesen müssen, empfehlen wir, den SpannerIO-Connector anstelle der Spanner API zu verwenden.

Dataflow-Vorlagen sind vordefinierte Dataflow-Pipelines, die gängige Anwendungsfälle implementieren. Eine Übersicht finden Sie unter Dataflow-Vorlagen.

Dataflow-Pipeline

Eine Dataflow-Pipeline für Spanner-Änderungsstreams besteht aus vier Hauptteilen:

  1. Eine Spanner-Datenbank mit einem Änderungsstream
  2. Der SpannerIO-Connector
  3. Benutzerdefinierte Transformationen und Senken
  4. Einen Apache Beam-E/A-Writer für den Datensink

Image

Spanner-Änderungsstream

Weitere Informationen zum Erstellen eines Änderungsstreams finden Sie unter Änderungsstream erstellen.

Apache Beam SpannerIO-Connector

Dies ist der SpannerIO-Connector, der im vorherigen Abschnitt zu Dataflow beschrieben wurde. Es ist ein Quell-E/A-Connector, der eine PCollection von Datenänderungssätzen an spätere Phasen der Pipeline ausgibt. Die Ereigniszeit für jeden gesendeten Datenänderungseintrag ist der Commit-Zeitstempel. Die gesendeten Datensätze sind unsortiert und der SpannerIO-Connector garantiert, dass es keine verzögerten Datensätze gibt.

Bei der Arbeit mit Änderungsstreams verwendet Dataflow Checkpoints. Daher wartet jeder Worker möglicherweise bis zum konfigurierten Prüfpunktintervall, um Änderungen zwischenzuspeichern, bevor die Änderungen zur weiteren Verarbeitung gesendet werden.

Benutzerdefinierte Transformationen

Mit einer benutzerdefinierten Transformation können Nutzer Verarbeitungsdaten in einer Dataflow-Pipeline zusammenfassen, transformieren oder ändern. Gängige Anwendungsfälle hierfür sind das Entfernen personenidentifizierbarer Informationen, die Erfüllung von Anforderungen an das Downstream-Datenformat und die Sortierung. In der offiziellen Apache Beam-Dokumentation finden Sie das Programmierhandbuch zu Transformationen.

Apache Beam-Sink-E/A-Writer

Apache Beam enthält integrierte E/A-Connectors, mit denen Sie von einer Dataflow-Pipeline in eine Datensenke wie BigQuery schreiben können. Die gängigsten Datensenken werden nativ unterstützt.

Dataflow-Vorlagen

Mit Dataflow-Vorlagen können Sie Dataflow-Jobs basierend auf vorgefertigten Docker-Images für gängige Anwendungsfälle mit der Google Cloud -Konsole, der Google Cloud -Befehlszeile oder REST API-Aufrufen erstellen.

Für Spanner-Änderungsstreams bieten wir drei flexible Dataflow-Vorlagen an:

IAM-Berechtigungen für Dataflow-Vorlagen festlegen

Bevor Sie einen Dataflow-Job mit den drei aufgeführten Flex-Vorlagen erstellen, müssen Sie die erforderlichen IAM-Berechtigungen für die folgenden Dienstkonten haben:

Wenn Sie nicht über die erforderlichen IAM-Berechtigungen verfügen, müssen Sie zum Erstellen des Dataflow-Jobs ein nutzerverwaltetes Worker-Dienstkonto angeben. Weitere Informationen finden Sie unter Sicherheit und Berechtigungen in Dataflow.

Wenn Sie versuchen, einen Job aus einer Dataflow-Flex-Vorlage auszuführen, ohne alle erforderlichen Berechtigungen zu haben, schlägt der Job möglicherweise mit der Fehlermeldung Fehler beim Lesen der Ergebnisdatei oder Berechtigung für Ressource verweigert fehl. Weitere Informationen finden Sie unter Fehlerbehebung bei Flex-Vorlagen.

Dataflow-Pipeline erstellen

In diesem Abschnitt wird die Erstkonfiguration des Connectors beschrieben. Außerdem finden Sie hier Beispiele für gängige Integrationen mit der Spanner-Änderungsstream-Funktion.

Für diese Schritte benötigen Sie eine Java-Entwicklungsumgebung für Dataflow. Weitere Informationen finden Sie unter Dataflow-Pipeline mit Java erstellen.

Änderungsstream erstellen

Weitere Informationen zum Erstellen eines Änderungsstreams finden Sie unter Änderungsstream erstellen. Damit Sie mit den nächsten Schritten fortfahren können, benötigen Sie eine Spanner-Datenbank mit einem konfigurierten Änderungsstream.

Berechtigungen für die detaillierte Zugriffssteuerung gewähren

Wenn Sie davon ausgehen, dass Nutzer mit einer detaillierten Zugriffssteuerung den Dataflow-Job ausführen, müssen Sie ihnen Zugriff auf eine Datenbankrolle mit der Berechtigung SELECT für den Änderungsstream und der Berechtigung EXECUTE für die Tabellenwertfunktion des Änderungsstreams gewähren. Achten Sie außerdem darauf, dass das Hauptkonto die Datenbankrolle in der SpannerIO-Konfiguration oder in der Dataflow-Flex-Vorlage angibt.

Weitere Informationen finden Sie unter Detaillierte Zugriffssteuerung.

SpannerIO-Connector als Abhängigkeit hinzufügen

Der Apache Beam SpannerIO-Connector kapselt die Komplexität der direkten Verwendung der Änderungsstreams über die Cloud Spanner API ein und gibt eine PCollection von Änderungsstream-Datensätzen an spätere Phasen der Pipeline aus.

Diese Objekte können in anderen Phasen der Dataflow-Pipeline des Nutzers verwendet werden. Die Integration von Änderungsstreams ist Teil des SpannerIO-Connectors. Damit Sie den SpannerIO-Connector verwenden können, muss die Abhängigkeit Ihrer pom.xml-Datei hinzugefügt werden:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

Metadatendatenbank erstellen

Der Connector muss beim Ausführen der Apache Beam-Pipeline jede Partition im Blick behalten. Diese Metadaten werden in einer Spanner-Tabelle gespeichert, die vom Connector während der Initialisierung erstellt wurde. Sie geben die Datenbank an, in der diese Tabelle erstellt wird, wenn Sie den Connector konfigurieren.

Wie in den Best Practices für Änderungsstreams beschrieben, empfehlen wir, zu diesem Zweck eine neue Datenbank zu erstellen, anstatt dem Connector zu erlauben, die Datenbank Ihrer Anwendung zum Speichern der Metadatentabelle zu verwenden.

Der Inhaber eines Dataflow-Jobs, für den der SpannerIO-Connector verwendet wird, muss für diese Metadatendatenbank die folgenden IAM-Berechtigungen haben:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Connector konfigurieren

Der Spanner-Änderungsstreams-Connector kann so konfiguriert werden:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

Im Folgenden werden die Optionen für readChangeStream() beschrieben:

Spanner-Konfiguration (erforderlich)

Wird zum Konfigurieren des Projekts, der Instanz und der Datenbank verwendet, in der der Änderungsstream erstellt wurde und abgefragt werden soll. Optional wird auch die Datenbankrolle angegeben, die verwendet werden soll, wenn das IAM-Hauptkonto, das den Dataflow-Job ausführt, ein Nutzer der detaillierten Zugriffssteuerung ist. Der Job übernimmt diese Datenbankrolle für den Zugriff auf den Änderungsstream. Weitere Informationen finden Sie unter Detaillierte Zugriffssteuerung.

Name des Änderungsstreams (erforderlich)

Dieser Name identifiziert den Änderungsstream eindeutig. Der Name muss mit dem Namen übereinstimmen, der beim Erstellen verwendet wurde.

Metadaten-Instanz-ID (optional)

In dieser Instanz werden die Metadaten gespeichert, die vom Connector verwendet werden, um die Nutzung der Änderungsstream-API-Daten zu steuern.

Datenbank-ID für Metadaten (erforderlich)

In dieser Datenbank werden die Metadaten gespeichert, die der Connector verwendet, um die Nutzung der Änderungsstream-API-Daten zu steuern.

Name der Metadatentabelle (optional)

Diese Option sollte nur beim Aktualisieren einer vorhandenen Pipeline verwendet werden.

Dies ist der Name der vorhandenen Metadatentabelle, die vom Connector verwendet werden soll. Dieser Wert wird vom Connector verwendet, um die Metadaten zu speichern und die Nutzung der Änderungsstream-API-Daten zu steuern. Wenn diese Option weggelassen wird, erstellt Spanner bei der Connector-Initialisierung eine neue Tabelle mit einem generierten Namen.

RPC-Priorität (optional)

Die Anfragepriorität, die für die Änderungsstreamabfragen verwendet werden soll. Wenn Sie diesen Parameter weglassen, wird high priority verwendet.

InclusiveStartAt (erforderlich)

Änderungen ab dem angegebenen Zeitstempel werden an den Aufrufer zurückgegeben.

InclusiveEndAt (optional)

Änderungen bis zum angegebenen Zeitstempel werden an den Aufrufer zurückgegeben. Wenn dieser Parameter weggelassen wird, werden Änderungen unbegrenzt gesendet.

Transformationen und Senken zum Verarbeiten von Änderungsdaten hinzufügen

Nachdem Sie die vorherigen Schritte ausgeführt haben, kann der konfigurierte SpannerIO-Connector eine PCollection von DataChangeRecord-Objekten ausgeben. Unter Beispieltransformationen und ‑senken finden Sie mehrere Beispielpipelinekonfigurationen, mit denen diese gestreamten Daten auf unterschiedliche Weise verarbeitet werden.

Beachten Sie, dass Änderungsstream-Eintragsdatensätze, die vom SpannerIO-Connector gesendet werden, nicht sortiert sind. Das liegt daran, dass PCollections keine Garantien für die Reihenfolge bieten. Wenn Sie einen sortierten Stream benötigen, müssen Sie die Datensätze in Ihren Pipelines als Transformationen gruppieren und sortieren. Weitere Informationen finden Sie unter Beispiel: Nach Schlüssel sortieren. Sie können dieses Beispiel erweitern, um die Einträge nach beliebigen Feldern der Einträge zu sortieren, z. B. nach Transaktions-IDs.

Beispieltransformationen und ‑senken

Sie können eigene Transformationen definieren und Senken angeben, in die die Daten geschrieben werden sollen. Die Apache Beam-Dokumentation enthält eine Vielzahl von Transformationen, die angewendet werden können, sowie vorkonfigurierte E/A-Connectors zum Schreiben der Daten in externe Systeme.

Beispiel: Nach Schlüssel sortieren

In diesem Codebeispiel werden Datenänderungsdatensätze ausgegeben, die nach Commit-Zeitstempel sortiert und mithilfe des Dataflow-Connectors nach Primärschlüsseln gruppiert werden.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

In diesem Codebeispiel werden Status und Timer verwendet, um Datensätze für jeden Schlüssel zu puffern. Das Ablaufdatum des Timers wird auf einen vom Nutzer konfigurierten Zeitpunkt in der Zukunft festgelegt (definiert in der Funktion BufferKeyUntilOutputTimestamp).T Wenn das Dataflow-Wasserzeichen die Zeit T überschreitet, werden mit diesem Code alle Datensätze im Puffer mit einem Zeitstempel kleiner als T gelöscht, diese Datensätze nach Commit-Zeitstempel sortiert und ein Schlüssel/Wert-Paar ausgegeben, wobei:

  • Der Schlüssel ist der Eingabeschlüssel, also der Primärschlüssel, der in ein Bucket-Array mit einer Größe von 1.000 gehasht wird.
  • Der Wert sind die sortierten Datenänderungssätze, die für den Schlüssel zwischengespeichert wurden.

Für jeden Schlüssel gelten die folgenden Garantien:

  • Timer werden garantiert in der Reihenfolge des Ablaufzeitstempels ausgelöst.
  • Nachgelagerte Phasen erhalten die Elemente garantiert in der Reihenfolge, in der sie erstellt wurden.

Bei einem Schlüssel mit dem Wert 100 wird der Timer beispielsweise bei T1 und T10 ausgelöst, wodurch bei jedem Zeitstempel ein Datenänderungssatz generiert wird. Da die Datenänderungsdatensätze, die bei T1 ausgegeben wurden, vor den Datenänderungsdatensätzen, die bei T10 ausgegeben wurden, erstellt wurden, werden die Datenänderungsdatensätze, die bei T1 ausgegeben wurden, garantiert vor den Datenänderungsdatensätzen, die bei T10 ausgegeben wurden, in der nächsten Phase empfangen. Mit diesem Mechanismus können wir eine strikte Commit-Zeitstempelreihenfolge pro Primärschlüssel für die nachgelagerte Verarbeitung gewährleisten.

Dieser Vorgang wird wiederholt, bis die Pipeline endet und alle Datenänderungseinträge verarbeitet wurden. Andernfalls wird er unbegrenzt wiederholt, wenn kein Endzeitpunkt angegeben ist.

In diesem Codebeispiel werden statt Fenstern Status und Timer verwendet, um die Sortierung nach Schlüssel auszuführen. Das liegt daran, dass die Zeitfenster nicht garantiert in der richtigen Reihenfolge verarbeitet werden. Das bedeutet, dass ältere Zeiträume später als neuere Zeiträume verarbeitet werden können, was zu einer Verarbeitung in der falschen Reihenfolge führen kann.

BreakRecordByModFn

Jeder Datenänderungseintrag kann mehrere Änderungen enthalten. Jede Modifikation steht für das Einfügen, Aktualisieren oder Löschen eines einzelnen Primärschlüsselwerts. Diese Funktion teilt jeden Datenänderungseintrag in separate Datenänderungseinträge auf, einen pro Mod.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Diese Funktion nimmt eine DataChangeRecord als Eingabe an und gibt eine DataChangeRecord zurück, die über den zu einer Ganzzahl gehashten Spanner-Primärschlüssel sortiert ist.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

Timer und Puffer sind pro Schlüssel. Diese Funktion puffert jeden Datenänderungseintrag, bis das Wasserzeichen den Zeitstempel überschreitet, zu dem die gepufferten Datenänderungseinträge ausgegeben werden sollen.

In diesem Code wird ein Schleifen-Timer verwendet, um zu bestimmen, wann der Puffer geleert werden soll:

  1. Wenn zum ersten Mal ein Datenänderungseintrag für einen Schlüssel erkannt wird, wird der Timer so eingestellt, dass er nach dem Commit-Zeitstempel des Datenänderungseintrags plus incrementIntervalSeconds (eine vom Nutzer konfigurierbare Option) ausgelöst wird.
  2. Wenn der Timer ausgelöst wird, werden alle Datenänderungseinträge im Puffer mit einem Zeitstempel, der kleiner als die Ablaufzeit des Timers ist, zu recordsToOutput hinzugefügt. Wenn der Puffer Datenänderungseinträge enthält, deren Zeitstempel größer oder gleich dem Ablaufzeitpunkt des Timers ist, werden diese Datenänderungseinträge wieder in den Puffer aufgenommen, anstatt sie auszugeben. Der nächste Timer wird dann auf die Ablaufzeit des aktuellen Timers plus incrementIntervalInSeconds festgelegt.
  3. Wenn recordsToOutput nicht leer ist, sortiert die Funktion die Datenänderungseinträge in recordsToOutput nach Commit-Zeitstempel und Transaktions-ID und gibt sie dann aus.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Transaktionen sortieren

Diese Pipeline kann so geändert werden, dass sie nach Transaktions-ID und Commit-Zeitstempel sortiert wird. Dazu werden Einträge für jedes Transaktions-ID / Commit-Zeitstempel-Paar statt für jeden Spanner-Schlüssel zwischengespeichert. Dazu muss der Code in KeyByIdFn geändert werden.

Beispiel: Transaktionen zusammenführen

In diesem Codebeispiel werden Datenänderungssätze gelesen, alle Datenänderungssätze, die zu derselben Transaktion gehören, in einem einzelnen Element zusammengeführt und dieses Element ausgegeben. Die mit diesem Beispielcode ausgegebene Transaktionen werden nicht nach Commit-Zeitstempel sortiert.

In diesem Codebeispiel werden Puffer verwendet, um Transaktionen aus Datenänderungseinträgen zusammenzustellen. Wenn zum ersten Mal ein Datenänderungseintrag für eine Transaktion empfangen wird, wird das Feld numberOfRecordsInTransaction in diesem Datenänderungseintrag gelesen. Es gibt die erwartete Anzahl der Datenänderungseinträge an, die zu dieser Transaktion gehören. Die zu dieser Transaktion gehörenden Datenänderungssätze werden im Puffer gespeichert, bis die Anzahl der Puffersätze numberOfRecordsInTransaction erreicht. Anschließend werden die zusammengefassten Datenänderungssätze ausgegeben.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Diese Funktion nimmt eine DataChangeRecord als Eingabe an und gibt eine DataChangeRecord aus, die anhand der Transaktions-ID sortiert ist.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn puffert empfangene Schlüssel/Wert-Paare von {TransactionId, DataChangeRecord} von KeyByTransactionIdFn und puffert sie in Gruppen basierend auf TransactionId. Wenn die Anzahl der gepufferten Datensätze der Anzahl der Datensätze in der gesamten Transaktion entspricht, sortiert diese Funktion die DataChangeRecord-Objekte in der Gruppe nach Datensatzsequenz und gibt ein Schlüssel/Wert-Paar von {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord> aus.

Hier gehen wir davon aus, dass SortKey eine benutzerdefinierte Klasse ist, die ein {CommitTimestamp, TransactionId}-Paar darstellt. Weitere Informationen zu SortKey findest du in der Beispielimplementierung.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Beispiel: Nach Transaktions-Tag filtern

Wenn eine Transaktion, die Nutzerdaten ändert, getaggt wird, werden das entsprechende Tag und sein Typ als Teil von DataChangeRecord gespeichert. In diesen Beispielen wird veranschaulicht, wie Sie Änderungsstream-Eintragsdaten basierend auf benutzerdefinierten Transaktions- und System-Tags filtern:

Benutzerdefiniertes Tag-Filtern für my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

System-Tag-Filterung/TTL:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Beispiel: Vollständige Zeile abrufen

Dieses Beispiel funktioniert mit einer Spanner-Tabelle namens Singer mit der folgenden Definition:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

Wenn bei Änderungsstreams im Standardwerterfassungsmodus OLD_AND_NEW_VALUES eine Spanner-Zeile aktualisiert wird, enthält der empfangene Datenänderungssatz nur die Spalten, die geändert wurden. Getrackte, aber unveränderte Spalten werden nicht in den Datensatz aufgenommen. Mit dem Primärschlüssel des Modus kann ein Spanner-Snapshot zum Commit-Zeitstempel des Datenänderungssatzes gelesen werden, um die unveränderten Spalten oder sogar die vollständige Zeile abzurufen.

Die Datenbankaufbewahrungsrichtlinie muss möglicherweise in einen Wert geändert werden, der mindestens der Aufbewahrungsrichtlinie des Änderungsstreams entspricht, damit die Snapshot-Lesevorgänge erfolgreich sind.

Außerdem ist die Verwendung des Werterfassungstyps NEW_ROW die empfohlene und effizientere Methode, da standardmäßig alle erfassten Spalten der Zeile zurückgegeben werden und kein zusätzlicher Snapshot-Lesevorgang in Spanner erforderlich ist.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Bei dieser Transformation wird eine veraltete Lesevorgang am Commit-Zeitstempel jedes empfangenen Datensatzes ausgeführt und die gesamte Zeile wird in JSON umgewandelt.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Mit diesem Code wird ein Spanner-Datenbankclient erstellt, um den Abruf der gesamten Zeile auszuführen. Der Sitzungspool wird so konfiguriert, dass nur wenige Sitzungen vorhanden sind, die Lesevorgänge in einer Instanz der ToFullReowJsonFn nacheinander ausführen. Dataflow sorgt dafür, dass viele Instanzen dieser Funktion mit jeweils einem eigenen Clientpool erstellt werden.

Beispiel: Spanner zu Pub/Sub

In diesem Szenario streamt der Aufrufer Datensätze so schnell wie möglich ohne Gruppierung oder Aggregation an Pub/Sub. Dies eignet sich gut zum Auslösen der nachgelagerten Verarbeitung, da alle neuen Zeilen, die in eine Spanner-Tabelle eingefügt werden, zur weiteren Verarbeitung an Pub/Sub gestreamt werden.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Die Pub/Sub-Senke kann so konfiguriert werden, dass die Exactly-Once-Semantik gewährleistet ist.

Beispiel: Spanner zu Cloud Storage

In diesem Szenario gruppiert der Aufrufer alle Datensätze innerhalb eines bestimmten Zeitraums und speichert die Gruppe in separaten Cloud Storage-Dateien. Das ist eine gute Lösung für Analysen und die Archivierung zu einem bestimmten Zeitpunkt, unabhängig von der Aufbewahrungsdauer von Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Der Cloud Storage-Sink bietet standardmäßig die Semantik „mindestens einmal“. Mit zusätzlicher Verarbeitung kann sie so geändert werden, dass sie eine „Exactly-Once“-Semantik hat.

Für diesen Anwendungsfall gibt es auch eine Dataflow-Vorlage: Änderungsstreams mit Cloud Storage verbinden

Beispiel: Spanner to BigQuery (Ledger-Tabelle)

Hier streamt der Aufrufer Änderungseinträge in BigQuery. Jeder Datenänderungseintrag wird in BigQuery als Zeile dargestellt. Das eignet sich gut für Analysen. In diesem Code werden die Funktionen verwendet, die zuvor im Abschnitt Ganze Zeile abrufen definiert wurden, um die vollständige Zeile des Datensatzes abzurufen und in BigQuery zu schreiben.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Der BigQuery-Sink bietet standardmäßig die Semantik „Mindestens einmal“. Mit zusätzlicher Verarbeitung kann sie so geändert werden, dass sie eine „Exactly-Once“-Semantik hat.

Für diesen Anwendungsfall stellen wir auch eine Dataflow-Vorlage bereit. Weitere Informationen finden Sie unter Änderungsstreams mit BigQuery verbinden.

Pipeline überwachen

Es gibt zwei Messklassen, mit denen sich eine Dataflow-Pipeline mit Änderungsstream überwachen lässt.

Standardmesswerte für Dataflow

Dataflow bietet mehrere Messwerte, mit denen Sie den Status Ihres Jobs prüfen können, z. B. Datenaktualität, Systemverzögerung, Jobdurchsatz und CPU-Auslastung von Workern. Weitere Informationen finden Sie unter Monitoring für Dataflow-Pipelines verwenden.

Bei Pipelines für Änderungsstreams sollten zwei Hauptmesswerte berücksichtigt werden: die Systemlatenz und die Datenaktualität.

Die Systemlatenz gibt die aktuelle maximale Dauer in Sekunden an, für die ein Datenelement verarbeitet wird oder auf die Verarbeitung wartet.

Die Datenaktualität gibt an, wie viel Zeit zwischen dem aktuellen Zeitpunkt (Echtzeit) und dem Ausgabewasserzeichen vergangen ist. Das Ausgabe-Wasserzeichen der Zeit T gibt an, dass alle Elemente mit einer Ereigniszeit (streng) vor T für die Berechnung verarbeitet wurden. Mit anderen Worten: Die Datenaktualität gibt an, wie aktuell die Pipeline in Bezug auf die Verarbeitung der empfangenen Ereignisse ist.

Wenn der Pipeline zu wenig Ressourcen zur Verfügung stehen, wirkt sich das auf diese beiden Messwerte aus. Die Systemlatenz erhöht sich, da Elemente länger warten müssen, bevor sie verarbeitet werden. Außerdem steigt die Datenaktualität, da die Pipeline mit der Menge der empfangenen Daten nicht Schritt halten kann.

Benutzerdefinierte Messwerte für Änderungsstreams

Diese Messwerte sind in Cloud Monitoring verfügbar und umfassen:

  • Bucketierte (Histogramm-)Latenz zwischen dem Commit eines Datensatzes in Spanner und der Ausgabe in eine PCollection durch den Connector. Mit diesem Messwert können Sie Leistungsprobleme (Latenz) mit der Pipeline erkennen.
  • Gesamtzahl der gelesenen Datensätze. Dies ist eine allgemeine Angabe zur Anzahl der vom Connector gesendeten Datensätze. Diese Zahl sollte immer weiter steigen und den Trend der Schreibvorgänge in der zugrunde liegenden Spanner-Datenbank widerspiegeln.
  • Anzahl der Partitionen, die gelesen werden. Es sollten immer Partitionen gelesen werden. Wenn diese Zahl null ist, ist in der Pipeline ein Fehler aufgetreten.
  • Gesamtzahl der Abfragen, die während der Ausführung des Connectors gestellt wurden. Dies ist eine allgemeine Angabe zu Änderungsstreamabfragen, die während der Ausführung der Pipeline an die Spanner-Instanz gesendet wurden. So lässt sich die Auslastung vom Connector zur Spanner-Datenbank schätzen.

Vorhandene Pipeline aktualisieren

Eine laufende Pipeline, in der der SpannerIO-Connector zum Verarbeiten von Änderungsstreams verwendet wird, kann aktualisiert werden, wenn die Jobkompatibilitätsprüfungen bestanden werden. Dazu müssen Sie beim Aktualisieren des neuen Jobs den Parameter „Name der Metadatentabelle“ explizit festlegen. Verwenden Sie den Wert der Pipelineoption metadataTable aus dem Job, den Sie aktualisieren.

Wenn Sie eine von Google bereitgestellte Dataflow-Vorlage verwenden, legen Sie den Tabellennamen mit dem Parameter spannerMetadataTableName fest. Sie können auch Ihren vorhandenen Job so ändern, dass die Metadatentabelle mit der Methode withMetadataTable(your-metadata-table-name) in der Connectorkonfiguration explizit verwendet wird. Anschließend können Sie der Anleitung unter Ersetzenden Job starten in der Dataflow-Dokumentation folgen, um einen laufenden Job zu aktualisieren.

Best Practices für Änderungsstreams und Dataflow

Im Folgenden finden Sie einige Best Practices für die Erstellung von Verbindungen zu Änderungsstreams mit Dataflow.

Separate Metadatendatenbank verwenden

Wir empfehlen, eine separate Datenbank für den SpannerIO-Connector zum Speichern von Metadaten zu erstellen, anstatt ihn für die Verwendung Ihrer Anwendungsdatenbank zu konfigurieren.

Weitere Informationen finden Sie unter Eine separate Metadatendatenbank verwenden.

Clustergröße festlegen

Als Faustregel für die anfängliche Anzahl der Worker in einem Spanner-Job für Änderungsstreams gilt ein Worker pro 1.000 Schreibvorgänge pro Sekunde. Diese Schätzung kann je nach mehreren Faktoren variieren, z. B. nach der Größe der einzelnen Transaktionen, der Anzahl der Änderungsstream-Eintragssätze, die aus einer einzelnen Transaktion generiert werden, und anderen Transformationen, Aggregationen oder Senken, die in der Pipeline verwendet werden.

Nach der anfänglichen Ressourcenzuweisung ist es wichtig, die unter Pipeline überwachen genannten Messwerte im Blick zu behalten, um sicherzustellen, dass die Pipeline ordnungsgemäß funktioniert. Wir empfehlen, mit einer anfänglichen Größe des Worker-Pools zu experimentieren und zu beobachten, wie Ihre Pipeline mit der Last umgeht. Erhöhen Sie gegebenenfalls die Anzahl der Knoten. Die CPU-Auslastung ist ein wichtiger Messwert, um zu prüfen, ob die Auslastung angemessen ist und ob weitere Knoten erforderlich sind.

Bekannte Einschränkungen

Bei der Verwendung von Spanner-Änderungsstreams mit Dataflow gibt es einige bekannte Einschränkungen:

Autoscaling

Für das Autoscaling von Pipelines, die SpannerIO.readChangeStream enthalten, ist Apache Beam 2.39.0 oder höher erforderlich.

Wenn Sie eine Apache Beam-Version vor 2.39.0 verwenden, muss der Autoscaling-Algorithmus in Pipelines, die SpannerIO.readChangeStream enthalten, explizit als NONE angegeben werden, wie unter Horizontales Autoscaling beschrieben.

Informationen zum manuellen Skalieren einer Dataflow-Pipeline anstelle des Autoscalings finden Sie unter Streamingpipeline manuell skalieren.

Runner V2

Der Spanner-Änderungsstreams-Connector erfordert Dataflow Runner v2. Dies muss während der Ausführung manuell angegeben werden, da sonst ein Fehler auftritt. Sie können Runner V2 angeben, indem Sie Ihren Job mit --experiments=use_unified_worker,use_runner_v2 konfigurieren.

Snapshot

Der Spanner-Connector für Änderungsstreams unterstützt keine Dataflow-Snapshots.

Ausgleichen

Der Spanner-Connector für Änderungsstreams unterstützt nicht das Drainen eines Jobs. Es ist nur möglich, einen vorhandenen Job abbrechen.

Sie können auch eine vorhandene Pipeline aktualisieren, ohne sie anhalten zu müssen.

OpenCensus

Wenn Sie OpenCensus zum Überwachen Ihrer Pipeline verwenden möchten, geben Sie Version 0.28.3 oder höher an.

NullPointerException beim Start der Pipeline

Ein Fehler in Apache Beam-Version 2.38.0 kann unter bestimmten Bedingungen beim Starten der Pipeline zu einer NullPointerException führen. Dadurch wird der Job nicht gestartet und stattdessen wird diese Fehlermeldung angezeigt:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Verwenden Sie zur Behebung dieses Problems entweder Apache Beam Version 2.39.0 oder höher oder geben Sie die Version von beam-sdks-java-core manuell als 2.37.0 an:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

Weitere Informationen