Vorlage "Spanner-Änderungsstreams für BigQuery"

Die Vorlage „Spanner-Änderungsstreams für BigQuery“ ist eine Streamingpipeline, die Datenänderungsdatensätze von Spanner streamt und sie mithilfe von Dataflow Runner V2 in BigQuery-Tabellen schreibt.

Alle Spalten zum Beobachten von Änderungsstreams sind in jeder BigQuery-Tabellenzeile enthalten, unabhängig davon, ob sie durch eine Spanner-Transaktion geändert werden. Nicht beobachtete Spalten sind nicht in der BigQuery-Zeile enthalten. Alle Spanner-Änderungen, die kleiner als das Dataflow-Wasserzeichen sind, werden entweder erfolgreich auf die BigQuery-Tabellen angewendet oder in der Dead-Letter-Warteschlange zur Wiederholung gespeichert. BigQuery-Zeilen werden im Vergleich zur ursprünglichen Reihenfolge des Spanner Commit-Zeitstempels in der falschen Reihenfolge eingefügt.

Wenn die erforderlichen BigQuery-Tabellen nicht vorhanden sind, werden sie von der Pipeline erstellt. Andernfalls werden vorhandene BigQuery-Tabellen verwendet. Das Schema vorhandener BigQuery-Tabellen muss die entsprechenden nachverfolgten Spalten der Spanner-Tabellen und alle zusätzlichen Metadatenspalten enthalten, die nicht explizit von der Option ignoreFields ignoriert werden. Eine Beschreibung der Metadatenfelder finden Sie in der folgenden Liste. Jede neue BigQuery-Zeile enthält alle Spalten, die vom Änderungsstream aus der entsprechenden Zeile in Ihrer Spanner-Tabelle zum Zeitstempel des Änderungsdatensatzes beobachtet werden.

Die folgenden Metadatenfelder werden zu BigQuery-Tabellen hinzugefügt: Weitere Informationen zu diesen Feldern finden Sie unter Datenänderungsdatensätze in „Änderungsstream-Partitionen, -Datensätze und -Abfragen”.

Beachten Sie bei der Verwendung dieser Vorlage die folgenden Details:

  • Diese Vorlage überträgt keine Schemaänderungen von Spanner an BigQuery. Da die Schemaänderung in Spanner wahrscheinlich zu einer Unterbrechung der Pipeline führt, müssen Sie die Pipeline nach der Schemaänderung möglicherweise neu erstellen.
  • Bei den Werterfassungstypen OLD_AND_NEW_VALUES und NEW_VALUES muss die Vorlage, wenn der Datensatz eine UPDATE-Änderung enthält, einen veralteten Lesevorgang in Spanner zum Commit-Zeitstempel des Datensatzes durchführen, um die unveränderten, aber überwachten Spalten abzurufen. Prüfen Sie, ob Sie die „version_retention_period“ in Ihrer Datenbank richtig konfiguriert haben, um veraltete Daten lesen zu können. Für den Werterfassungstyp NEW_ROW ist die Vorlage effizienter, da der Datensatz zur Datenänderung die vollständige neue Zeile erfasst, einschließlich Spalten, die in UPDATE-Anfragen nicht aktualisiert werden, und die Vorlage keinen veralteten Lesevorgang ausführen muss.
  • Führen Sie den Dataflow-Job in derselben Region wie Ihre Spanner-Instanz oder BigQuery-Tabellen aus, um die Netzwerklatenz und die Netzwerktransportkosten zu minimieren. Wenn Sie Quellen und Senken sowie Speicherorte für Staging-Dateien und temporäre Dateien verwenden, die sich außerhalb der Region Ihres Jobs befinden, werden Ihre Daten möglicherweise regionenübergreifend gesendet. Weitere Informationen finden Sie unter Dataflow-Regionen.
  • Diese Vorlage unterstützt alle gültigen Spanner-Datentypen. Wenn der BigQuery-Typ jedoch genauer ist als der Spanner-Typ, kann während der Transformation ein Genauigkeitsverlust auftreten. Insbesondere:
    • Im Fall von JSON-Typen in Spanner wird die Reihenfolge der Mitglieder eines Objekts lexikografisch angeordnet. Es gibt jedoch keine Garantie dafür.
    • Spanner unterstützt den TIMESTAMP-Typ „Nanosekunden”, aber BigQuery unterstützt nur den TIMESTAMP-Typ „Mikrosekunden”.
  • Diese Vorlage unterstützt die Verwendung der BigQuery Storage Write API im „Genau einmal“-Modus ni ht.

Weitere Informationen zu Änderungsstreams, zum Erstellen von Dataflow-Pipelines für Änderungsstreams und Best Practices

Pipelineanforderungen

  • Die Spanner-Instanz muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Spanner-Datenbank muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Spanner-Metadateninstanz muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Spanner-Metadatendatenbank muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Der Spanner-Änderungsstream muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Das BigQuery-Dataset muss vorhanden sein, bevor Sie die Pipeline ausführen.

Vorlagenparameter

Erforderliche Parameter

  • spannerInstanceId : Die Spanner-Instanz, aus der Änderungsstreams gelesen werden sollen.
  • spannerDatabase : Die Spanner-Datenbank, aus der Änderungsstreams gelesen werden sollen.
  • spannerMetadataInstanceId : Die Spanner-Instanz, die für die Metadatentabelle des Connectors für Änderungsstreams verwendet werden soll.
  • spannerMetadataDatabase : Die Spanner-Datenbank, die für die Metadatentabelle des Connectors für Änderungsstreams verwendet werden soll. Für Änderungsstreams, die alle Tabellen in einer Datenbank verfolgen, empfehlen wir, die Metadatentabelle in einer separaten Datenbank abzulegen.
  • spannerChangeStreamName : Der Name des Spanner-Änderungsstreams, aus dem gelesen werden soll.
  • bigQueryDataset: Das BigQuery-Dataset für die Ausgabe der Änderungsstreams. Sowohl dataSetName als auch die vollständige dataSetId (d. h. bigQueryProjectId.dataSetName) sind zulässig.

Optionale Parameter

  • spannerProjectId : Das Projekt, aus dem Änderungsstreams gelesen werden. Der Standardwert für diesen Parameter ist das Projekt, in dem die Dataflow-Pipeline ausgeführt wird.
  • spannerDatabaseRole : Die Datenbankrolle, die der Nutzer beim Lesen aus dem Änderungsstream annimmt. Die Datenbankrolle muss die erforderlichen Berechtigungen zum Lesen aus dem Änderungsstream haben. Wenn keine Datenbankrolle angegeben ist, sollte der Nutzer die erforderlichen IAM-Berechtigungen zum Lesen aus der Datenbank haben.
  • spannerMetadataTableName : Der Name der zu verwendenden Connector-Metadatentabelle für Cloud Spanner-Änderungsstreams. Wenn nicht angegeben, wird während des Pipelineablaufs automatisch eine Metadatentabelle für Cloud Spanner-Änderungsstreams erstellt. Dieser Parameter muss beim Aktualisieren einer vorhandenen Pipeline angegeben werden und sollte nicht anderweitig angegeben werden.
  • rpcPriority : Die Anfragepriorität für Cloud Spanner-Aufrufe. Der Wert muss einer der folgenden sein:[HIGH,MEDIUM,LOW]. Die Standardeinstellung ist HIGH.
  • spannerHost : Der Cloud Spanner-Endpunkt, der in der Vorlage aufgerufen werden soll. Wird nur zum Testen verwendet. (Beispiel: https://batch-spanner.googleapis.com).
  • startTimestamp : Die Start-DateTime (einschließlich), die zum Lesen von Änderungsstreams verwendet wird (https://tools.ietf.org/html/rfc3339). Beispiel: 2022-05-05T07:59:59Z. Die Standardeinstellung ist der Zeitstempel für den Start der Pipeline.
  • endTimestamp : Die End-DateTime (einschließlich), die zum Lesen von Änderungsstreams verwendet wird (https://tools.ietf.org/html/rfc3339). Ex-2021-10-12T07:20:50.52Z. Die Standardeinstellung ist eine unendliche Zeit in der Zukunft.
  • bigQueryProjectId: Das BigQuery-Projekt. Der Standardwert ist das Projekt für den Dataflow-Job.
  • bigQueryChangelogTableNameTemplate: Die Vorlage für den BigQuery-Tabellennamen, der das Änderungslog enthält. Die Standardeinstellung ist {_metadata_spanner_table_name}_changelog.
  • deadLetterQueueDirectory: Der Dateipfad zum Speichern nicht verarbeiteter Einträge mit einer Begründung für die fehlerhafte Verarbeitung. Der Standardwert ist ein Verzeichnis unter dem temporären Speicherort des Dataflow-Jobs. Der Standardwert ist unter den meisten Bedingungen ausreichend.
  • dlqRetryMinutes: Die Anzahl der Minuten zwischen DLQ-Wiederholungen (Dead Letter Queue). Der Standardwert ist 10.
  • ignoreFields: Durch Kommas getrennte Liste von Feldern, die ignoriert werden sollen. Dies können Felder von verfolgten Tabellen oder Metadatenfelder sein, die _metadata_spanner_mod_type, _metadata_spanner_table_name, _metadata_spanner_commit_timestamp, _metadata_spanner_server_transaction_id, _metadata_spanner_record_sequence, _metadata_spanner_is_last_record_in_transaction_in_partition, _metadata_spanner_number_of_records_in_transaction, _metadata_spanner_number_of_partitions_in_transaction, _metadata_big_query_commit_timestamp sind. Die Standardeinstellung ist leer.
  • disableDlqRetries: Gibt an, ob Wiederholungsversuche für den DLQ deaktiviert werden sollen. Die Standardeinstellung ist "false".
  • useStorageWriteApi : Wenn „true“, verwendet die Pipeline beim Schreiben der Daten in BigQuery die Storage Write API (siehe https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). Der Standardwert ist „false“. Wenn Sie die Storage Write API im genau einmaligen Modus verwenden, müssen Sie die folgenden Parameter festlegen: „Anzahl der Streams für die BigQuery Storage Write API“ und „Triggerhäufigkeit in Sekunden für die BigQuery Storage Write API“. Wenn Sie den Dataflow-Modus „Mindestens einmal“ aktivieren oder den Parameter „useStorageWriteApiAtLeastOnce“ auf „true“ setzen, müssen Sie die Anzahl der Streams oder die Triggerhäufigkeit nicht festlegen.
  • useStorageWriteApiAtLeastOnce : Dieser Parameter wird nur wirksam, wenn "BigQuery Storage Write API verwenden" aktiviert ist. Wenn diese Option aktiviert ist, wird für die Storage Write API die "Mindestens einmal"-Semantik verwendet. Andernfalls wird die "Genau einmal"-Semantik verwendet. Die Standardeinstellung ist "false".
  • numStorageWriteApiStreams: Die Anzahl der Streams definiert die Parallelität der Write-Transformation von BigQueryIO und entspricht ungefähr der Anzahl der Streams der Storage Write API, die von der Pipeline verwendet werden. Die empfohlenen Werte finden Sie unter https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api. Die Standardeinstellung ist 0.
  • storageWriteApiTriggeringFrequencySec: Die Triggerhäufigkeit legt fest, wie schnell die Daten für Abfragen in BigQuery sichtbar sind. Die empfohlenen Werte finden Sie unter https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Spanner change streams to BigQuery templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Ersetzen Sie Folgendes:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • SPANNER_INSTANCE_ID: Spanner-Instanz-ID
  • SPANNER_DATABASE: Spanner-Datenbank
  • SPANNER_METADATA_INSTANCE_ID: Spanner-Metadateninstanz-ID
  • SPANNER_METADATA_DATABASE: Spanner-Metadatendatenbank
  • SPANNER_CHANGE_STREAM: Spanner-Änderungsstream
  • BIGQUERY_DATASET: Das BigQuery-Dataset für die Ausgabe der Änderungsstreams.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • SPANNER_INSTANCE_ID: Spanner-Instanz-ID
  • SPANNER_DATABASE: Spanner-Datenbank
  • SPANNER_METADATA_INSTANCE_ID: Spanner-Metadateninstanz-ID
  • SPANNER_METADATA_DATABASE: Spanner-Metadatendatenbank
  • SPANNER_CHANGE_STREAM: Spanner-Änderungsstream
  • BIGQUERY_DATASET: Das BigQuery-Dataset für die Ausgabe der Änderungsstreams.

Nächste Schritte